Merge pull request #188 from Roguelazer/issue-183-take-2

join on client thread when all clients have been dropped, take two
This commit is contained in:
Sean McArthur
2017-08-17 18:04:51 -07:00
committed by GitHub

View File

@@ -1,6 +1,7 @@
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use std::thread;
use futures::{Future, Stream};
use futures::sync::{mpsc, oneshot};
@@ -313,21 +314,32 @@ impl fmt::Debug for ClientBuilder {
struct ClientHandle {
gzip: bool,
timeout: Option<Duration>,
tx: Arc<ThreadSender>
inner: Arc<InnerClientHandle>
}
type ThreadSender = mpsc::UnboundedSender<(async_impl::Request, oneshot::Sender<::Result<async_impl::Response>>)>;
struct InnerClientHandle {
tx: Option<ThreadSender>,
thread: Option<thread::JoinHandle<()>>
}
impl Drop for InnerClientHandle {
fn drop(&mut self) {
self.tx.take();
self.thread.take().map(|h| h.join());
}
}
impl ClientHandle {
fn new(builder: &mut ClientBuilder) -> ::Result<ClientHandle> {
use std::thread;
let gzip = builder.gzip;
let timeout = builder.timeout;
let mut builder = async_impl::client::take_builder(&mut builder.inner);
let (tx, rx) = mpsc::unbounded();
let (spawn_tx, spawn_rx) = oneshot::channel::<::Result<()>>();
try_!(thread::Builder::new().name("reqwest-internal-sync-core".into()).spawn(move || {
let handle = try_!(thread::Builder::new().name("reqwest-internal-sync-core".into()).spawn(move || {
use tokio_core::reactor::Core;
let built = (|| {
@@ -364,10 +376,16 @@ impl ClientHandle {
wait::timeout(spawn_rx, timeout).expect("core thread cancelled")?;
let inner_handle = Arc::new(InnerClientHandle {
tx: Some(tx),
thread: Some(handle)
});
Ok(ClientHandle {
gzip: gzip,
timeout: timeout,
tx: Arc::new(tx),
inner: inner_handle,
})
}
@@ -375,7 +393,7 @@ impl ClientHandle {
let (tx, rx) = oneshot::channel();
let (req, body) = request::async(req);
let url = req.url().clone();
self.tx.send((req, tx)).expect("core thread panicked");
self.inner.tx.as_ref().expect("core thread exited early").send((req, tx)).expect("core thread panicked");
if let Some(body) = body {
try_!(body.send(), &url);
@@ -394,11 +412,11 @@ impl ClientHandle {
}
};
res.map(|res| {
response::new(res, self.gzip, self.timeout, KeepCoreThreadAlive(self.tx.clone()))
response::new(res, self.gzip, self.timeout, KeepCoreThreadAlive(self.inner.clone()))
})
}
}
// pub(crate)
pub struct KeepCoreThreadAlive(Arc<ThreadSender>);
pub struct KeepCoreThreadAlive(Arc<InnerClientHandle>);