Merge pull request #457 from reem/scoped-to-spawn

fix(server): Use thread::spawn instead of thread::scoped.
This commit is contained in:
Sean McArthur
2015-04-15 13:18:13 -07:00
3 changed files with 28 additions and 30 deletions

View File

@@ -6,8 +6,8 @@ cache:
- target - target
script: script:
- cargo build - cargo build --features nightly
- cargo test - cargo test --features nightly
- cargo bench --features nightly - cargo bench --features nightly
after_success: | after_success: |

View File

@@ -1,12 +1,13 @@
use std::thread::{self, JoinGuard}; use std::sync::{Arc, mpsc};
use std::sync::mpsc; use std::thread;
use net::NetworkListener; use net::NetworkListener;
pub struct ListenerPool<A: NetworkListener> { pub struct ListenerPool<A: NetworkListener> {
acceptor: A acceptor: A
} }
impl<'a, A: NetworkListener + Send + 'a> ListenerPool<A> { impl<A: NetworkListener + Send + 'static> ListenerPool<A> {
/// Create a thread pool to manage the acceptor. /// Create a thread pool to manage the acceptor.
pub fn new(acceptor: A) -> ListenerPool<A> { pub fn new(acceptor: A) -> ListenerPool<A> {
ListenerPool { acceptor: acceptor } ListenerPool { acceptor: acceptor }
@@ -18,31 +19,32 @@ impl<'a, A: NetworkListener + Send + 'a> ListenerPool<A> {
/// ///
/// Panics if threads == 0. /// Panics if threads == 0.
pub fn accept<F>(self, work: F, threads: usize) pub fn accept<F>(self, work: F, threads: usize)
where F: Fn(A::Stream) + Send + Sync + 'a { where F: Fn(A::Stream) + Send + Sync + 'static {
assert!(threads != 0, "Can't accept on 0 threads."); assert!(threads != 0, "Can't accept on 0 threads.");
let (super_tx, supervisor_rx) = mpsc::channel(); let (super_tx, supervisor_rx) = mpsc::channel();
let work = &work; let work = Arc::new(work);
let spawn = move |id| {
spawn_with(super_tx.clone(), work, self.acceptor.clone(), id)
};
// Go // Begin work.
let mut guards: Vec<_> = (0..threads).map(|id| spawn(id)).collect(); for _ in (0..threads) {
spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())
}
for id in supervisor_rx.iter() { // Monitor for panics.
guards[id] = spawn(id); // FIXME(reem): This won't ever exit since we still have a super_tx handle.
for _ in supervisor_rx.iter() {
spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone());
} }
} }
} }
fn spawn_with<'a, A, F>(supervisor: mpsc::Sender<usize>, work: &'a F, mut acceptor: A, id: usize) -> thread::JoinGuard<'a, ()> fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A)
where A: NetworkListener + Send + 'a, where A: NetworkListener + Send + 'static,
F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'a { F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static {
thread::scoped(move || { thread::spawn(move || {
let _sentinel = Sentinel::new(supervisor, id); let _sentinel = Sentinel::new(supervisor, ());
loop { loop {
match acceptor.accept() { match acceptor.accept() {
@@ -52,13 +54,12 @@ where A: NetworkListener + Send + 'a,
} }
} }
} }
}) });
} }
struct Sentinel<T: Send + 'static> { struct Sentinel<T: Send + 'static> {
value: Option<T>, value: Option<T>,
supervisor: mpsc::Sender<T>, supervisor: mpsc::Sender<T>,
//active: bool
} }
impl<T: Send + 'static> Sentinel<T> { impl<T: Send + 'static> Sentinel<T> {
@@ -66,18 +67,12 @@ impl<T: Send + 'static> Sentinel<T> {
Sentinel { Sentinel {
value: Some(data), value: Some(data),
supervisor: channel, supervisor: channel,
//active: true
} }
} }
//fn cancel(mut self) { self.active = false; }
} }
impl<T: Send + 'static> Drop for Sentinel<T> { impl<T: Send + 'static> Drop for Sentinel<T> {
fn drop(&mut self) { fn drop(&mut self) {
// If we were cancelled, get out of here.
//if !self.active { return; }
// Respawn ourselves // Respawn ourselves
let _ = self.supervisor.send(self.value.take().unwrap()); let _ = self.supervisor.send(self.value.take().unwrap());
} }

View File

@@ -3,7 +3,7 @@ use std::io::{BufWriter, Write};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
use std::path::Path; use std::path::Path;
use std::thread::{self, JoinGuard}; use std::thread::{self, JoinHandle};
use num_cpus; use num_cpus;
@@ -104,7 +104,7 @@ S: NetworkStream + Clone + Send> Server<'a, H, L> {
let pool = ListenerPool::new(listener.clone()); let pool = ListenerPool::new(listener.clone());
let work = move |mut stream| handle_connection(&mut stream, &handler); let work = move |mut stream| handle_connection(&mut stream, &handler);
let guard = thread::scoped(move || pool.accept(work, threads)); let guard = thread::spawn(move || pool.accept(work, threads));
Ok(Listening { Ok(Listening {
_guard: guard, _guard: guard,
@@ -175,7 +175,10 @@ where S: NetworkStream + Clone, H: Handler {
/// A listening server, which can later be closed. /// A listening server, which can later be closed.
pub struct Listening { pub struct Listening {
_guard: JoinGuard<'static, ()>, #[cfg(feature = "nightly")]
_guard: JoinHandle<()>,
#[cfg(not(feature = "nightly"))]
_guard: JoinHandle,
/// The socket addresses that the server is bound to. /// The socket addresses that the server is bound to.
pub socket: SocketAddr, pub socket: SocketAddr,
} }