feat(server): make AcceptorPool::accept() block and allow non'-static data

Change AcceptorPool to not spawn detached threads anymore. This,
together with the recent `Send` changes, allows the `work` closure to
close over non-`'static` data.

This doesn't change the high-level `Server` interface, because that
would make it's `listen` a blocking call (it's currently non-blocking)
- which would be a breaking change.
This commit is contained in:
Renato Zannon
2015-02-21 02:37:22 -02:00
committed by Sean McArthur
parent b47f936525
commit b0a72d80d0
2 changed files with 28 additions and 20 deletions

View File

@@ -1,13 +1,13 @@
use std::thread::{self, JoinGuard}; use std::thread::{self, JoinGuard};
use std::sync::Arc;
use std::sync::mpsc; use std::sync::mpsc;
use std::collections::VecMap;
use net::NetworkAcceptor; use net::NetworkAcceptor;
pub struct AcceptorPool<A: NetworkAcceptor> { pub struct AcceptorPool<A: NetworkAcceptor> {
acceptor: A acceptor: A
} }
impl<A: NetworkAcceptor + 'static> AcceptorPool<A> { impl<'a, A: NetworkAcceptor + 'a> AcceptorPool<A> {
/// Create a thread pool to manage the acceptor. /// Create a thread pool to manage the acceptor.
pub fn new(acceptor: A) -> AcceptorPool<A> { pub fn new(acceptor: A) -> AcceptorPool<A> {
AcceptorPool { acceptor: acceptor } AcceptorPool { acceptor: acceptor }
@@ -18,33 +18,39 @@ impl<A: NetworkAcceptor + 'static> AcceptorPool<A> {
/// ## Panics /// ## Panics
/// ///
/// Panics if threads == 0. /// Panics if threads == 0.
pub fn accept<F>(self, work: F, threads: usize) -> JoinGuard<'static, ()> pub fn accept<F>(self, work: F, threads: usize)
where F: Fn(A::Stream) + Send + Sync + 'static { where F: Fn(A::Stream) + Send + Sync + 'a {
assert!(threads != 0, "Can't accept on 0 threads."); assert!(threads != 0, "Can't accept on 0 threads.");
// Replace with &F when Send changes land.
let work = Arc::new(work);
let (super_tx, supervisor_rx) = mpsc::channel(); let (super_tx, supervisor_rx) = mpsc::channel();
let spawn = let counter = &mut 0;
move || spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone()); let work = &work;
let mut spawn = move || {
let id = *counter;
let guard = spawn_with(super_tx.clone(), work, self.acceptor.clone(), id);
*counter += 1;
(id, guard)
};
// Go // Go
for _ in 0..threads { spawn() } let mut guards: VecMap<_> = (0..threads).map(|_| spawn()).collect();
// Spawn the supervisor for id in supervisor_rx.iter() {
thread::scoped(move || for () in supervisor_rx.iter() { spawn() }) guards.remove(&id);
let (id, guard) = spawn();
guards.insert(id, guard);
}
} }
} }
fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A) fn spawn_with<'a, A, F>(supervisor: mpsc::Sender<usize>, work: &'a F, mut acceptor: A, id: usize) -> JoinGuard<'a, ()>
where A: NetworkAcceptor + 'static, where A: NetworkAcceptor + 'a,
F: Fn(<A as NetworkAcceptor>::Stream) + Send + Sync + 'static { F: Fn(<A as NetworkAcceptor>::Stream) + Send + Sync + 'a {
use std::old_io::EndOfFile; use std::old_io::EndOfFile;
thread::spawn(move || { thread::scoped(move || {
let sentinel = Sentinel::new(supervisor, ()); let sentinel = Sentinel::new(supervisor, id);
loop { loop {
match acceptor.accept() { match acceptor.accept() {
@@ -60,7 +66,7 @@ where A: NetworkAcceptor + 'static,
} }
} }
} }
}); })
} }
struct Sentinel<T: Send> { struct Sentinel<T: Send> {

View File

@@ -2,7 +2,7 @@
use std::old_io::{Listener, BufferedReader, BufferedWriter}; use std::old_io::{Listener, BufferedReader, BufferedWriter};
use std::old_io::net::ip::{IpAddr, Port, SocketAddr}; use std::old_io::net::ip::{IpAddr, Port, SocketAddr};
use std::os; use std::os;
use std::thread::JoinGuard; use std::thread::{self, JoinGuard};
pub use self::request::Request; pub use self::request::Request;
pub use self::response::Response; pub use self::response::Response;
@@ -77,8 +77,10 @@ S: NetworkStream + Clone + Send> Server<L> {
let pool = AcceptorPool::new(acceptor.clone()); let pool = AcceptorPool::new(acceptor.clone());
let work = move |stream| handle_connection(stream, &handler); let work = move |stream| handle_connection(stream, &handler);
let guard = thread::scoped(move || pool.accept(work, threads));
Ok(Listening { Ok(Listening {
_guard: pool.accept(work, threads), _guard: guard,
socket: socket, socket: socket,
acceptor: acceptor acceptor: acceptor
}) })