From 26ec18a282f2e1731f7b4233f921ccb2bf950981 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 13 Mar 2018 16:34:12 -0700 Subject: [PATCH] refactor(client): make conn::ResponseFuture implement Send --- src/client/conn.rs | 34 +++++++++++++++++++++---------- src/client/dispatch.rs | 43 +++++++++++++++++++++++++++++++++++----- src/proto/h1/dispatch.rs | 5 ++--- 3 files changed, 63 insertions(+), 19 deletions(-) diff --git a/src/client/conn.rs b/src/client/conn.rs index cf194fe1..d35b74cf 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -79,7 +79,7 @@ pub struct Handshake { pub struct ResponseFuture { // for now, a Box is used to hide away the internal `B` // that can be returned if canceled - inner: Box>, + inner: Box + Send>, } /// Deconstructed parts of a `Connection`. @@ -191,10 +191,25 @@ where // It's important that this method isn't called directly from the // `Client`, so that `set_proxy` there is still respected. req.set_proxy(true); - let inner = self.send_request_retryable(req).map_err(|e| { - let (err, _orig_req) = e; - err - }); + + let (head, body) = proto::request::split(req); + let inner = match self.dispatch.send((head, body)) { + Ok(rx) => { + Either::A(rx.then(move |res| { + match res { + Ok(Ok(res)) => Ok(res), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + } + })) + }, + Err(_req) => { + debug!("connection was not ready"); + let err = ::Error::new_canceled(Some("connection was not ready")); + Either::B(future::err(err)) + } + }; ResponseFuture { inner: Box::new(inner), } @@ -203,7 +218,7 @@ where //TODO: replace with `impl Future` when stable pub(crate) fn send_request_retryable(&mut self, req: Request) -> Box)>)>> { let (head, body) = proto::request::split(req); - let inner = match self.dispatch.send((head, body)) { + let inner = match self.dispatch.try_send((head, body)) { Ok(rx) => { Either::A(rx.then(move |res| { match res { @@ -480,8 +495,5 @@ where #[doc(hidden)] impl AssertSendSync for Builder {} -// TODO: This could be done by using a dispatch channel that doesn't -// return the `B` on Error, removing the possibility of contains some !Send -// thing. -//#[doc(hidden)] -//impl AssertSend for ResponseFuture {} +#[doc(hidden)] +impl AssertSend for ResponseFuture {} diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 80ed8e1c..efc720d8 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -4,8 +4,9 @@ use futures::sync::{mpsc, oneshot}; use common::Never; use super::signal; -pub type Callback = oneshot::Sender)>>; -pub type Promise = oneshot::Receiver)>>; +//pub type Callback = oneshot::Sender)>>; +pub type RetryPromise = oneshot::Receiver)>>; +pub type Promise = oneshot::Receiver>; pub fn channel() -> (Sender, Receiver) { let (tx, rx) = mpsc::channel(0); @@ -48,9 +49,16 @@ impl Sender { self.giver.is_canceled() } - pub fn send(&mut self, val: T) -> Result, T> { + pub fn try_send(&mut self, val: T) -> Result, T> { let (tx, rx) = oneshot::channel(); - self.inner.try_send((val, tx)) + self.inner.try_send((val, Callback::Retry(tx))) + .map(move |_| rx) + .map_err(|e| e.into_inner().0) + } + + pub fn send(&mut self, val: T) -> Result, T> { + let (tx, rx) = oneshot::channel(); + self.inner.try_send((val, Callback::NoRetry(tx))) .map(move |_| rx) .map_err(|e| e.into_inner().0) } @@ -98,6 +106,31 @@ impl Drop for Receiver { } +pub enum Callback { + Retry(oneshot::Sender)>>), + NoRetry(oneshot::Sender>), +} + +impl Callback { + pub fn poll_cancel(&mut self) -> Poll<(), ()> { + match *self { + Callback::Retry(ref mut tx) => tx.poll_cancel(), + Callback::NoRetry(ref mut tx) => tx.poll_cancel(), + } + } + + pub fn send(self, val: Result)>) { + match self { + Callback::Retry(tx) => { + let _ = tx.send(val); + }, + Callback::NoRetry(tx) => { + let _ = tx.send(val.map_err(|e| e.0)); + } + } + } +} + #[cfg(test)] mod tests { extern crate pretty_env_logger; @@ -118,7 +151,7 @@ mod tests { struct Custom(i32); let (mut tx, rx) = super::channel::(); - let promise = tx.send(Custom(43)).unwrap(); + let promise = tx.try_send(Custom(43)).unwrap(); drop(rx); promise.then(|fulfilled| { diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 7023dc29..9b10d559 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -2,7 +2,6 @@ use std::io; use bytes::Bytes; use futures::{Async, AsyncSink, Future, Poll, Stream}; -use futures::sync::oneshot; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_service::Service; @@ -33,7 +32,7 @@ pub struct Server { } pub struct Client { - callback: Option>)>>>, + callback: Option<::client::dispatch::Callback, ::Response>>, rx: ClientRx, } @@ -475,7 +474,7 @@ mod tests { subject: ::proto::RequestLine::default(), headers: Default::default(), }; - let res_rx = tx.send((req, None::<::Body>)).unwrap(); + let res_rx = tx.try_send((req, None::<::Body>)).unwrap(); let a1 = dispatcher.poll().expect("error should be sent on channel"); assert!(a1.is_ready(), "dispatcher should be closed");