fix(server): Use thread::spawn instead of thread::scoped.
This commit is contained in:
		| @@ -6,8 +6,8 @@ cache: | |||||||
|       - target |       - target | ||||||
|  |  | ||||||
| script: | script: | ||||||
|     - cargo build |     - cargo build --features nightly | ||||||
|     - cargo test |     - cargo test --features nightly | ||||||
|     - cargo bench --features nightly |     - cargo bench --features nightly | ||||||
|  |  | ||||||
| after_success: | | after_success: | | ||||||
|   | |||||||
| @@ -1,12 +1,13 @@ | |||||||
| use std::thread::{self, JoinGuard}; | use std::sync::{Arc, mpsc}; | ||||||
| use std::sync::mpsc; | use std::thread; | ||||||
|  |  | ||||||
| use net::NetworkListener; | use net::NetworkListener; | ||||||
|  |  | ||||||
| pub struct ListenerPool<A: NetworkListener> { | pub struct ListenerPool<A: NetworkListener> { | ||||||
|     acceptor: A |     acceptor: A | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<'a, A: NetworkListener + Send + 'a> ListenerPool<A> { | impl<A: NetworkListener + Send + 'static> ListenerPool<A> { | ||||||
|     /// Create a thread pool to manage the acceptor. |     /// Create a thread pool to manage the acceptor. | ||||||
|     pub fn new(acceptor: A) -> ListenerPool<A> { |     pub fn new(acceptor: A) -> ListenerPool<A> { | ||||||
|         ListenerPool { acceptor: acceptor } |         ListenerPool { acceptor: acceptor } | ||||||
| @@ -18,31 +19,32 @@ impl<'a, A: NetworkListener + Send + 'a> ListenerPool<A> { | |||||||
|     /// |     /// | ||||||
|     /// Panics if threads == 0. |     /// Panics if threads == 0. | ||||||
|     pub fn accept<F>(self, work: F, threads: usize) |     pub fn accept<F>(self, work: F, threads: usize) | ||||||
|         where F: Fn(A::Stream) + Send + Sync + 'a { |         where F: Fn(A::Stream) + Send + Sync + 'static { | ||||||
|         assert!(threads != 0, "Can't accept on 0 threads."); |         assert!(threads != 0, "Can't accept on 0 threads."); | ||||||
|  |  | ||||||
|         let (super_tx, supervisor_rx) = mpsc::channel(); |         let (super_tx, supervisor_rx) = mpsc::channel(); | ||||||
|  |  | ||||||
|         let work = &work; |         let work = Arc::new(work); | ||||||
|         let spawn = move |id| { |  | ||||||
|             spawn_with(super_tx.clone(), work, self.acceptor.clone(), id) |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         // Go |         // Begin work. | ||||||
|         let mut guards: Vec<_> = (0..threads).map(|id| spawn(id)).collect(); |         for _ in (0..threads) { | ||||||
|  |             spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone()) | ||||||
|  |         } | ||||||
|  |  | ||||||
|         for id in supervisor_rx.iter() { |         // Monitor for panics. | ||||||
|             guards[id] = spawn(id); |         // FIXME(reem): This won't ever exit since we still have a super_tx handle. | ||||||
|  |         for _ in supervisor_rx.iter() { | ||||||
|  |             spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone()); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| fn spawn_with<'a, A, F>(supervisor: mpsc::Sender<usize>, work: &'a F, mut acceptor: A, id: usize) -> thread::JoinGuard<'a, ()> | fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A) | ||||||
| where A: NetworkListener + Send + 'a, | where A: NetworkListener + Send + 'static, | ||||||
|       F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'a { |       F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static { | ||||||
|  |  | ||||||
|     thread::scoped(move || { |     thread::spawn(move || { | ||||||
|         let _sentinel = Sentinel::new(supervisor, id); |         let _sentinel = Sentinel::new(supervisor, ()); | ||||||
|  |  | ||||||
|         loop { |         loop { | ||||||
|             match acceptor.accept() { |             match acceptor.accept() { | ||||||
| @@ -52,13 +54,12 @@ where A: NetworkListener + Send + 'a, | |||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     }) |     }); | ||||||
| } | } | ||||||
|  |  | ||||||
| struct Sentinel<T: Send + 'static> { | struct Sentinel<T: Send + 'static> { | ||||||
|     value: Option<T>, |     value: Option<T>, | ||||||
|     supervisor: mpsc::Sender<T>, |     supervisor: mpsc::Sender<T>, | ||||||
|     //active: bool |  | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<T: Send + 'static> Sentinel<T> { | impl<T: Send + 'static> Sentinel<T> { | ||||||
| @@ -66,18 +67,12 @@ impl<T: Send + 'static> Sentinel<T> { | |||||||
|         Sentinel { |         Sentinel { | ||||||
|             value: Some(data), |             value: Some(data), | ||||||
|             supervisor: channel, |             supervisor: channel, | ||||||
|             //active: true |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     //fn cancel(mut self) { self.active = false; } |  | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<T: Send + 'static> Drop for Sentinel<T> { | impl<T: Send + 'static> Drop for Sentinel<T> { | ||||||
|     fn drop(&mut self) { |     fn drop(&mut self) { | ||||||
|         // If we were cancelled, get out of here. |  | ||||||
|         //if !self.active { return; } |  | ||||||
|  |  | ||||||
|         // Respawn ourselves |         // Respawn ourselves | ||||||
|         let _ = self.supervisor.send(self.value.take().unwrap()); |         let _ = self.supervisor.send(self.value.take().unwrap()); | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -3,7 +3,7 @@ use std::io::{BufWriter, Write}; | |||||||
| use std::marker::PhantomData; | use std::marker::PhantomData; | ||||||
| use std::net::{SocketAddr, ToSocketAddrs}; | use std::net::{SocketAddr, ToSocketAddrs}; | ||||||
| use std::path::Path; | use std::path::Path; | ||||||
| use std::thread::{self, JoinGuard}; | use std::thread::{self, JoinHandle}; | ||||||
|  |  | ||||||
| use num_cpus; | use num_cpus; | ||||||
|  |  | ||||||
| @@ -104,7 +104,7 @@ S: NetworkStream + Clone + Send> Server<'a, H, L> { | |||||||
|         let pool = ListenerPool::new(listener.clone()); |         let pool = ListenerPool::new(listener.clone()); | ||||||
|         let work = move |mut stream| handle_connection(&mut stream, &handler); |         let work = move |mut stream| handle_connection(&mut stream, &handler); | ||||||
|  |  | ||||||
|         let guard = thread::scoped(move || pool.accept(work, threads)); |         let guard = thread::spawn(move || pool.accept(work, threads)); | ||||||
|  |  | ||||||
|         Ok(Listening { |         Ok(Listening { | ||||||
|             _guard: guard, |             _guard: guard, | ||||||
| @@ -175,7 +175,10 @@ where S: NetworkStream + Clone, H: Handler { | |||||||
|  |  | ||||||
| /// A listening server, which can later be closed. | /// A listening server, which can later be closed. | ||||||
| pub struct Listening { | pub struct Listening { | ||||||
|     _guard: JoinGuard<'static, ()>, |     #[cfg(feature = "nightly")] | ||||||
|  |     _guard: JoinHandle<()>, | ||||||
|  |     #[cfg(not(feature = "nightly"))] | ||||||
|  |     _guard: JoinHandle, | ||||||
|     /// The socket addresses that the server is bound to. |     /// The socket addresses that the server is bound to. | ||||||
|     pub socket: SocketAddr, |     pub socket: SocketAddr, | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user