join on client thread when all clients have been dropped, take two

This commit is contained in:
James Brown
2017-08-17 09:35:30 -07:00
parent 7bfa7adb49
commit 5071438c08

View File

@@ -1,6 +1,7 @@
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::thread;
use futures::{Future, Stream}; use futures::{Future, Stream};
use futures::sync::{mpsc, oneshot}; use futures::sync::{mpsc, oneshot};
@@ -313,21 +314,32 @@ impl fmt::Debug for ClientBuilder {
struct ClientHandle { struct ClientHandle {
gzip: bool, gzip: bool,
timeout: Option<Duration>, timeout: Option<Duration>,
tx: Arc<ThreadSender> inner: Arc<InnerClientHandle>
} }
type ThreadSender = mpsc::UnboundedSender<(async_impl::Request, oneshot::Sender<::Result<async_impl::Response>>)>; type ThreadSender = mpsc::UnboundedSender<(async_impl::Request, oneshot::Sender<::Result<async_impl::Response>>)>;
struct InnerClientHandle {
tx: Option<ThreadSender>,
thread: Option<thread::JoinHandle<()>>
}
impl Drop for InnerClientHandle {
fn drop(&mut self) {
self.tx.take();
self.thread.take().map(|h| h.join());
}
}
impl ClientHandle { impl ClientHandle {
fn new(builder: &mut ClientBuilder) -> ::Result<ClientHandle> { fn new(builder: &mut ClientBuilder) -> ::Result<ClientHandle> {
use std::thread;
let gzip = builder.gzip; let gzip = builder.gzip;
let timeout = builder.timeout; let timeout = builder.timeout;
let mut builder = async_impl::client::take_builder(&mut builder.inner); let mut builder = async_impl::client::take_builder(&mut builder.inner);
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
let (spawn_tx, spawn_rx) = oneshot::channel::<::Result<()>>(); let (spawn_tx, spawn_rx) = oneshot::channel::<::Result<()>>();
try_!(thread::Builder::new().name("reqwest-internal-sync-core".into()).spawn(move || { let handle = try_!(thread::Builder::new().name("reqwest-internal-sync-core".into()).spawn(move || {
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
let built = (|| { let built = (|| {
@@ -364,10 +376,16 @@ impl ClientHandle {
wait::timeout(spawn_rx, timeout).expect("core thread cancelled")?; wait::timeout(spawn_rx, timeout).expect("core thread cancelled")?;
let inner_handle = Arc::new(InnerClientHandle {
tx: Some(tx),
thread: Some(handle)
});
Ok(ClientHandle { Ok(ClientHandle {
gzip: gzip, gzip: gzip,
timeout: timeout, timeout: timeout,
tx: Arc::new(tx), inner: inner_handle,
}) })
} }
@@ -375,7 +393,7 @@ impl ClientHandle {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let (req, body) = request::async(req); let (req, body) = request::async(req);
let url = req.url().clone(); let url = req.url().clone();
self.tx.send((req, tx)).expect("core thread panicked"); self.inner.tx.as_ref().expect("core thread exited early").send((req, tx)).expect("core thread panicked");
if let Some(body) = body { if let Some(body) = body {
try_!(body.send(), &url); try_!(body.send(), &url);
@@ -394,11 +412,11 @@ impl ClientHandle {
} }
}; };
res.map(|res| { res.map(|res| {
response::new(res, self.gzip, self.timeout, KeepCoreThreadAlive(self.tx.clone())) response::new(res, self.gzip, self.timeout, KeepCoreThreadAlive(self.inner.clone()))
}) })
} }
} }
// pub(crate) // pub(crate)
pub struct KeepCoreThreadAlive(Arc<ThreadSender>); pub struct KeepCoreThreadAlive(Arc<InnerClientHandle>);