refactor(client): make conn::ResponseFuture implement Send

This commit is contained in:
Sean McArthur
2018-03-13 16:34:12 -07:00
parent dcfebc6308
commit 26ec18a282
3 changed files with 63 additions and 19 deletions

View File

@@ -79,7 +79,7 @@ pub struct Handshake<T, B> {
pub struct ResponseFuture { pub struct ResponseFuture {
// for now, a Box is used to hide away the internal `B` // for now, a Box is used to hide away the internal `B`
// that can be returned if canceled // that can be returned if canceled
inner: Box<Future<Item=Response, Error=::Error>>, inner: Box<Future<Item=Response, Error=::Error> + Send>,
} }
/// Deconstructed parts of a `Connection`. /// Deconstructed parts of a `Connection`.
@@ -191,10 +191,25 @@ where
// It's important that this method isn't called directly from the // It's important that this method isn't called directly from the
// `Client`, so that `set_proxy` there is still respected. // `Client`, so that `set_proxy` there is still respected.
req.set_proxy(true); req.set_proxy(true);
let inner = self.send_request_retryable(req).map_err(|e| {
let (err, _orig_req) = e; let (head, body) = proto::request::split(req);
err let inner = match self.dispatch.send((head, body)) {
}); Ok(rx) => {
Either::A(rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
}
}))
},
Err(_req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled(Some("connection was not ready"));
Either::B(future::err(err))
}
};
ResponseFuture { ResponseFuture {
inner: Box::new(inner), inner: Box::new(inner),
} }
@@ -203,7 +218,7 @@ where
//TODO: replace with `impl Future` when stable //TODO: replace with `impl Future` when stable
pub(crate) fn send_request_retryable(&mut self, req: Request<B>) -> Box<Future<Item=Response, Error=(::Error, Option<(::proto::RequestHead, Option<B>)>)>> { pub(crate) fn send_request_retryable(&mut self, req: Request<B>) -> Box<Future<Item=Response, Error=(::Error, Option<(::proto::RequestHead, Option<B>)>)>> {
let (head, body) = proto::request::split(req); let (head, body) = proto::request::split(req);
let inner = match self.dispatch.send((head, body)) { let inner = match self.dispatch.try_send((head, body)) {
Ok(rx) => { Ok(rx) => {
Either::A(rx.then(move |res| { Either::A(rx.then(move |res| {
match res { match res {
@@ -480,8 +495,5 @@ where
#[doc(hidden)] #[doc(hidden)]
impl AssertSendSync for Builder {} impl AssertSendSync for Builder {}
// TODO: This could be done by using a dispatch channel that doesn't #[doc(hidden)]
// return the `B` on Error, removing the possibility of contains some !Send impl AssertSend for ResponseFuture {}
// thing.
//#[doc(hidden)]
//impl AssertSend for ResponseFuture {}

View File

@@ -4,8 +4,9 @@ use futures::sync::{mpsc, oneshot};
use common::Never; use common::Never;
use super::signal; use super::signal;
pub type Callback<T, U> = oneshot::Sender<Result<U, (::Error, Option<T>)>>; //pub type Callback<T, U> = oneshot::Sender<Result<U, (::Error, Option<T>)>>;
pub type Promise<T, U> = oneshot::Receiver<Result<U, (::Error, Option<T>)>>; pub type RetryPromise<T, U> = oneshot::Receiver<Result<U, (::Error, Option<T>)>>;
pub type Promise<T> = oneshot::Receiver<Result<T, ::Error>>;
pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) { pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (tx, rx) = mpsc::channel(0); let (tx, rx) = mpsc::channel(0);
@@ -48,9 +49,16 @@ impl<T, U> Sender<T, U> {
self.giver.is_canceled() self.giver.is_canceled()
} }
pub fn send(&mut self, val: T) -> Result<Promise<T, U>, T> { pub fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.inner.try_send((val, tx)) self.inner.try_send((val, Callback::Retry(tx)))
.map(move |_| rx)
.map_err(|e| e.into_inner().0)
}
pub fn send(&mut self, val: T) -> Result<Promise<U>, T> {
let (tx, rx) = oneshot::channel();
self.inner.try_send((val, Callback::NoRetry(tx)))
.map(move |_| rx) .map(move |_| rx)
.map_err(|e| e.into_inner().0) .map_err(|e| e.into_inner().0)
} }
@@ -98,6 +106,31 @@ impl<T, U> Drop for Receiver<T, U> {
} }
pub enum Callback<T, U> {
Retry(oneshot::Sender<Result<U, (::Error, Option<T>)>>),
NoRetry(oneshot::Sender<Result<U, ::Error>>),
}
impl<T, U> Callback<T, U> {
pub fn poll_cancel(&mut self) -> Poll<(), ()> {
match *self {
Callback::Retry(ref mut tx) => tx.poll_cancel(),
Callback::NoRetry(ref mut tx) => tx.poll_cancel(),
}
}
pub fn send(self, val: Result<U, (::Error, Option<T>)>) {
match self {
Callback::Retry(tx) => {
let _ = tx.send(val);
},
Callback::NoRetry(tx) => {
let _ = tx.send(val.map_err(|e| e.0));
}
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
extern crate pretty_env_logger; extern crate pretty_env_logger;
@@ -118,7 +151,7 @@ mod tests {
struct Custom(i32); struct Custom(i32);
let (mut tx, rx) = super::channel::<Custom, ()>(); let (mut tx, rx) = super::channel::<Custom, ()>();
let promise = tx.send(Custom(43)).unwrap(); let promise = tx.try_send(Custom(43)).unwrap();
drop(rx); drop(rx);
promise.then(|fulfilled| { promise.then(|fulfilled| {

View File

@@ -2,7 +2,6 @@ use std::io;
use bytes::Bytes; use bytes::Bytes;
use futures::{Async, AsyncSink, Future, Poll, Stream}; use futures::{Async, AsyncSink, Future, Poll, Stream};
use futures::sync::oneshot;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_service::Service; use tokio_service::Service;
@@ -33,7 +32,7 @@ pub struct Server<S: Service> {
} }
pub struct Client<B> { pub struct Client<B> {
callback: Option<oneshot::Sender<Result<::Response, (::Error, Option<ClientMsg<B>>)>>>, callback: Option<::client::dispatch::Callback<ClientMsg<B>, ::Response>>,
rx: ClientRx<B>, rx: ClientRx<B>,
} }
@@ -475,7 +474,7 @@ mod tests {
subject: ::proto::RequestLine::default(), subject: ::proto::RequestLine::default(),
headers: Default::default(), headers: Default::default(),
}; };
let res_rx = tx.send((req, None::<::Body>)).unwrap(); let res_rx = tx.try_send((req, None::<::Body>)).unwrap();
let a1 = dispatcher.poll().expect("error should be sent on channel"); let a1 = dispatcher.poll().expect("error should be sent on channel");
assert!(a1.is_ready(), "dispatcher should be closed"); assert!(a1.is_ready(), "dispatcher should be closed");