feat(server): Server::new can take one or more listeners

Closes #859
This commit is contained in:
Ed Barnard
2016-07-18 23:32:38 +01:00
committed by Sean McArthur
parent a22ae26cec
commit d67dbc6028
3 changed files with 114 additions and 32 deletions

View File

@@ -1,6 +1,7 @@
//! A collection of traits abstracting over Listeners and Streams.
use std::io::{self, Read, Write};
use std::net::{SocketAddr};
use std::option;
use rotor::mio::tcp::{TcpStream, TcpListener};
use rotor::mio::{Selector, Token, Evented, EventSet, PollOpt, TryAccept};
@@ -168,6 +169,15 @@ impl Evented for HttpListener {
}
}
impl IntoIterator for HttpListener {
type Item = Self;
type IntoIter = option::IntoIter<Self>;
fn into_iter(self) -> Self::IntoIter {
Some(self).into_iter()
}
}
/// Deprecated
///
/// Use `SslClient` and `SslServer` instead.
@@ -390,6 +400,15 @@ impl<S: SslServer> Evented for HttpsListener<S> {
}
}
impl<S: SslServer> IntoIterator for HttpsListener<S> {
type Item = Self;
type IntoIter = option::IntoIter<Self>;
fn into_iter(self) -> Self::IntoIter {
Some(self).into_iter()
}
}
fn _assert_transport() {
fn _assert<T: Transport>() {}
_assert::<HttpsStream<HttpStream>>();

View File

@@ -37,19 +37,26 @@ impl<A: Accept, H: HandlerFactory<A::Output>> fmt::Debug for ServerLoop<A, H> {
/// A Server that can accept incoming network requests.
#[derive(Debug)]
pub struct Server<T: Accept> {
listener: T,
pub struct Server<A> {
lead_listener: A,
other_listeners: Vec<A>,
keep_alive: bool,
idle_timeout: Option<Duration>,
max_sockets: usize,
}
impl<T> Server<T> where T: Accept, T::Output: Transport {
/// Creates a new server with the provided Listener.
#[inline]
pub fn new(listener: T) -> Server<T> {
impl<A: Accept> Server<A> {
/// Creates a new Server from one or more Listeners.
///
/// Panics if listeners is an empty iterator.
pub fn new<I: IntoIterator<Item = A>>(listeners: I) -> Server<A> {
let mut listeners = listeners.into_iter();
let lead_listener = listeners.next().expect("Server::new requires at least 1 listener");
let other_listeners = listeners.collect::<Vec<_>>();
Server {
listener: listener,
lead_listener: lead_listener,
other_listeners: other_listeners,
keep_alive: true,
idle_timeout: Some(Duration::from_secs(10)),
max_sockets: 4096,
@@ -59,7 +66,7 @@ impl<T> Server<T> where T: Accept, T::Output: Transport {
/// Enables or disables HTTP keep-alive.
///
/// Default is true.
pub fn keep_alive(mut self, val: bool) -> Server<T> {
pub fn keep_alive(mut self, val: bool) -> Server<A> {
self.keep_alive = val;
self
}
@@ -67,7 +74,7 @@ impl<T> Server<T> where T: Accept, T::Output: Transport {
/// Sets how long an idle connection will be kept before closing.
///
/// Default is 10 seconds.
pub fn idle_timeout(mut self, val: Option<Duration>) -> Server<T> {
pub fn idle_timeout(mut self, val: Option<Duration>) -> Server<A> {
self.idle_timeout = val;
self
}
@@ -75,7 +82,7 @@ impl<T> Server<T> where T: Accept, T::Output: Transport {
/// Sets the maximum open sockets for this Server.
///
/// Default is 4096, but most servers can handle much more than this.
pub fn max_sockets(mut self, val: usize) -> Server<T> {
pub fn max_sockets(mut self, val: usize) -> Server<A> {
self.max_sockets = val;
self
}
@@ -105,13 +112,11 @@ impl<S: SslServer> Server<HttpsListener<S>> {
}
impl<A: Accept> Server<A> where A::Output: Transport {
impl<A: Accept> Server<A> {
/// Binds to a socket and starts handling connections.
pub fn handle<H>(self, factory: H) -> ::Result<(Listening, ServerLoop<A, H>)>
where H: HandlerFactory<A::Output> {
let addr = try!(self.listener.local_addr());
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_rx = shutdown.clone();
let mut config = rotor::Config::new();
config.slab_capacity(self.max_sockets);
@@ -119,19 +124,36 @@ impl<A: Accept> Server<A> where A::Output: Transport {
let keep_alive = self.keep_alive;
let idle_timeout = self.idle_timeout;
let mut loop_ = rotor::Loop::new(&config).unwrap();
let mut addrs = Vec::with_capacity(1 + self.other_listeners.len());
// Add the lead listener. This one handles shutdown messages.
let mut notifier = None;
{
let notifier = &mut notifier;
let listener = self.lead_listener;
addrs.push(try!(listener.local_addr()));
let shutdown_rx = shutdown.clone();
loop_.add_machine_with(move |scope| {
*notifier = Some(scope.notifier());
rotor_try!(scope.register(&self.listener, EventSet::readable(), PollOpt::level()));
rotor::Response::ok(ServerFsm::Listener::<A, H>(self.listener, shutdown_rx))
rotor_try!(scope.register(&listener, EventSet::readable(), PollOpt::level()));
rotor::Response::ok(ServerFsm::Listener(listener, shutdown_rx))
}).unwrap();
}
let notifier = notifier.expect("loop.add_machine failed");
// Add the other listeners.
for listener in self.other_listeners {
addrs.push(try!(listener.local_addr()));
let shutdown_rx = shutdown.clone();
loop_.add_machine_with(move |scope| {
rotor_try!(scope.register(&listener, EventSet::readable(), PollOpt::level()));
rotor::Response::ok(ServerFsm::Listener(listener, shutdown_rx))
}).unwrap();
}
let listening = Listening {
addr: addr,
addrs: addrs,
shutdown: (shutdown, notifier),
};
let server = ServerLoop {
@@ -299,14 +321,14 @@ where A: Accept,
/// A handle of the running server.
pub struct Listening {
addr: SocketAddr,
addrs: Vec<SocketAddr>,
shutdown: (Arc<AtomicBool>, rotor::Notifier),
}
impl fmt::Debug for Listening {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Listening")
.field("addr", &self.addr)
.field("addrs", &self.addrs)
.field("closed", &self.shutdown.0.load(Ordering::Relaxed))
.finish()
}
@@ -314,14 +336,20 @@ impl fmt::Debug for Listening {
impl fmt::Display for Listening {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.addr, f)
for (i, addr) in self.addrs().iter().enumerate() {
if i > 1 {
try!(f.write_str(", "));
}
try!(fmt::Display::fmt(addr, f));
}
Ok(())
}
}
impl Listening {
/// The address this server is listening on.
pub fn addr(&self) -> &SocketAddr {
&self.addr
/// The addresses this server is listening on.
pub fn addrs(&self) -> &[SocketAddr] {
&self.addrs
}
/// Stop the server from listening to its socket address.
@@ -375,8 +403,3 @@ where F: FnMut(http::Control) -> H, H: Handler<T>, T: Transport {
self(ctrl)
}
}
#[cfg(test)]
mod tests {
}

View File

@@ -7,7 +7,7 @@ use std::sync::mpsc;
use std::time::Duration;
use hyper::{Next, Encoder, Decoder};
use hyper::net::HttpStream;
use hyper::net::{HttpListener, HttpStream};
use hyper::server::{Server, Handler, Request, Response};
struct Serve {
@@ -17,8 +17,14 @@ struct Serve {
}
impl Serve {
fn addrs(&self) -> &[SocketAddr] {
self.listening.as_ref().unwrap().addrs()
}
fn addr(&self) -> &SocketAddr {
self.listening.as_ref().unwrap().addr()
let addrs = self.addrs();
assert!(addrs.len() == 1);
&addrs[0]
}
/*
@@ -161,11 +167,22 @@ fn serve() -> Serve {
}
fn serve_with_timeout(dur: Option<Duration>) -> Serve {
serve_n_with_timeout(1, dur)
}
fn serve_n(n: u32) -> Serve {
serve_n_with_timeout(n, None)
}
fn serve_n_with_timeout(n: u32, dur: Option<Duration>) -> Serve {
use std::thread;
let (msg_tx, msg_rx) = mpsc::channel();
let (reply_tx, reply_rx) = mpsc::channel();
let (listening, server) = Server::http(&"127.0.0.1:0".parse().unwrap()).unwrap()
let addr = "127.0.0.1:0".parse().unwrap();
let listeners = (0..n).map(|_| HttpListener::bind(&addr).unwrap());
let (listening, server) = Server::new(listeners)
.handle(move |_| {
let mut replies = Vec::new();
while let Ok(reply) = reply_rx.try_recv() {
@@ -180,7 +197,7 @@ fn serve_with_timeout(dur: Option<Duration>) -> Serve {
}).unwrap();
let thread_name = format!("test-server-{}: {:?}", listening.addr(), dur);
let thread_name = format!("test-server-{}: {:?}", listening, dur);
thread::Builder::new().name(thread_name).spawn(move || {
server.run();
}).unwrap();
@@ -439,3 +456,26 @@ fn server_keep_alive() {
}
}
}
#[test]
fn server_get_with_body_three_listeners() {
let server = serve_n(3);
let addrs = server.addrs();
assert_eq!(addrs.len(), 3);
for (i, addr) in addrs.iter().enumerate() {
let mut req = TcpStream::connect(addr).unwrap();
write!(req, "\
GET / HTTP/1.1\r\n\
Host: example.domain\r\n\
Content-Length: 17\r\n\
\r\n\
I'm sending to {}.\r\n\
", i).unwrap();
req.read(&mut [0; 256]).unwrap();
// note: doesnt include trailing \r\n, cause Content-Length wasn't 19
let comparison = format!("I'm sending to {}.", i).into_bytes();
assert_eq!(server.body(), comparison);
}
}