diff --git a/src/client.rs b/src/client.rs index c389951..a486b54 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,7 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; +use std::thread; use futures::{Future, Stream}; use futures::sync::{mpsc, oneshot}; @@ -313,21 +314,32 @@ impl fmt::Debug for ClientBuilder { struct ClientHandle { gzip: bool, timeout: Option, - tx: Arc + inner: Arc } type ThreadSender = mpsc::UnboundedSender<(async_impl::Request, oneshot::Sender<::Result>)>; +struct InnerClientHandle { + tx: Option, + thread: Option> +} + +impl Drop for InnerClientHandle { + fn drop(&mut self) { + self.tx.take(); + self.thread.take().map(|h| h.join()); + } +} + impl ClientHandle { fn new(builder: &mut ClientBuilder) -> ::Result { - use std::thread; let gzip = builder.gzip; let timeout = builder.timeout; let mut builder = async_impl::client::take_builder(&mut builder.inner); let (tx, rx) = mpsc::unbounded(); let (spawn_tx, spawn_rx) = oneshot::channel::<::Result<()>>(); - try_!(thread::Builder::new().name("reqwest-internal-sync-core".into()).spawn(move || { + let handle = try_!(thread::Builder::new().name("reqwest-internal-sync-core".into()).spawn(move || { use tokio_core::reactor::Core; let built = (|| { @@ -364,10 +376,16 @@ impl ClientHandle { wait::timeout(spawn_rx, timeout).expect("core thread cancelled")?; + let inner_handle = Arc::new(InnerClientHandle { + tx: Some(tx), + thread: Some(handle) + }); + + Ok(ClientHandle { gzip: gzip, timeout: timeout, - tx: Arc::new(tx), + inner: inner_handle, }) } @@ -375,7 +393,7 @@ impl ClientHandle { let (tx, rx) = oneshot::channel(); let (req, body) = request::async(req); let url = req.url().clone(); - self.tx.send((req, tx)).expect("core thread panicked"); + self.inner.tx.as_ref().expect("core thread exited early").send((req, tx)).expect("core thread panicked"); if let Some(body) = body { try_!(body.send(), &url); @@ -394,11 +412,11 @@ impl ClientHandle { } }; res.map(|res| { - response::new(res, self.gzip, self.timeout, KeepCoreThreadAlive(self.tx.clone())) + response::new(res, self.gzip, self.timeout, KeepCoreThreadAlive(self.inner.clone())) }) } } // pub(crate) -pub struct KeepCoreThreadAlive(Arc); +pub struct KeepCoreThreadAlive(Arc);