From c2d9e343767437b6db746914acfbb0847707bbc0 Mon Sep 17 00:00:00 2001 From: Jonathan Reem Date: Mon, 8 Sep 2014 20:29:28 -0700 Subject: [PATCH 1/3] Change Server to contain a Vec of (ip, port) pairs to allow repeat listening Expose Server::many for creating a Server that will listen on many (ip, port) pairs. Handler still receives a simple Iterator of (Request, Response) pairs. This is a breaking change since it changes the representation of Listener, but Handler and Server::http are unchanged in their API. Fixes #7 --- Cargo.toml | 6 +++ src/lib.rs | 2 + src/server/mod.rs | 103 +++++++++++++++++++++++++++++----------------- 3 files changed, 73 insertions(+), 38 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b6e2c24e..afca46b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,12 @@ git = "https://github.com/seanmonstar/mime.rs" [dependencies.unsafe-any] git = "https://github.com/reem/rust-unsafe-any" +[dependencies.intertwine] +git = "https://github.com/reem/rust-intertwine" + +[dependencies.move-acceptor] +git = "https://github.com/reem/rust-move-acceptor" + [dev-dependencies.curl] git = "https://github.com/carllerche/curl-rust" diff --git a/src/lib.rs b/src/lib.rs index 6daec19a..c74b3c61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,8 @@ extern crate url; #[phase(plugin,link)] extern crate log; #[cfg(test)] extern crate test; extern crate "unsafe-any" as uany; +extern crate "move-acceptor" as macceptor; +extern crate intertwine; pub use std::io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr, Port}; pub use mimewrapper::mime; diff --git a/src/server/mod.rs b/src/server/mod.rs index 5afbbf95..b061a12d 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,11 +1,16 @@ //! HTTP Server -use std::io::{Acceptor, Listener, IoResult, EndOfFile, IncomingConnections}; +use std::io::{Listener, IoResult, EndOfFile}; use std::io::net::ip::{IpAddr, Port, SocketAddr}; +use intertwine::{Intertwine, Intertwined}; +use macceptor::MoveAcceptor; + pub use self::request::Request; pub use self::response::{Response, Fresh, Streaming}; -use net::{NetworkListener, NetworkAcceptor, NetworkStream, HttpAcceptor, HttpListener}; +use net::{NetworkListener, NetworkAcceptor, NetworkStream, HttpAcceptor, HttpListener, HttpStream}; + +use {HttpResult}; pub mod request; pub mod response; @@ -15,55 +20,71 @@ pub mod response; /// Once listening, it will create a `Request`/`Response` pair for each /// incoming connection, and hand them to the provided handler. pub struct Server { - ip: IpAddr, - port: Port + pairs: Vec<(IpAddr, Port)> } +macro_rules! try_option( + ($e:expr) => {{ + match $e { + Some(v) => v, + None => return None + } + }} +) + impl Server { /// Creates a new server that will handle `HttpStream`s. pub fn http(ip: IpAddr, port: Port) -> Server { - Server { - ip: ip, - port: port - } + Server { pairs: vec![(ip, port)] } + } + + /// Creates a server that can listen to many (ip, port) pairs. + pub fn many(pairs: Vec<(IpAddr, Port)>) -> Server { + Server { pairs: pairs } } } impl, S: NetworkStream, A: NetworkAcceptor> Server { - /// Creates a server that can listen for and handle `NetworkStreams`. - pub fn new(ip: IpAddr, port: Port) -> Server { - Server { - ip: ip, - port: port - } - } - /// Binds to a socket, and starts handling connections. - pub fn listen>(self, handler: H) -> IoResult> { - let mut listener: L = try!(NetworkListener::bind(self.ip.to_string().as_slice(), self.port)); - let socket = try!(listener.socket_name()); - let acceptor = try!(listener.listen()); - let mut worker = acceptor.clone(); + pub fn listen(self, handler: H) -> HttpResult> + where S: NetworkStream, + A: NetworkAcceptor, + H: Handler, + L: NetworkListener { + let mut acceptors = Vec::new(); + let mut sockets = Vec::new(); + for (ip, port) in self.pairs.move_iter() { + let mut listener: L = try_io!(NetworkListener::::bind(ip.to_string().as_slice(), port)); + + sockets.push(try_io!(listener.socket_name())); + + let acceptor = try_io!(listener.listen()); + acceptors.push(acceptor.clone()); + } + + let connections = acceptors.clone() + .move_iter() + .map(|acceptor| acceptor.move_incoming()) + .intertwine(); spawn(proc() { - handler.handle(Incoming { from: worker.incoming() }); + handler.handle(Incoming { from: connections }); }); Ok(Listening { - acceptor: acceptor, - socket_addr: socket, + acceptors: acceptors, + sockets: sockets, }) } - } /// An iterator over incoming connections, represented as pairs of /// hyper Requests and Responses. -pub struct Incoming<'a, A: 'a = HttpAcceptor> { - from: IncomingConnections<'a, A> +pub struct Incoming { + from: Intertwined> } -impl<'a, S: NetworkStream, A: NetworkAcceptor> Iterator<(Request, Response)> for Incoming<'a, A> { +impl Iterator<(Request, Response)> for Incoming { fn next(&mut self) -> Option<(Request, Response)> { for conn in self.from { match conn { @@ -93,17 +114,23 @@ impl<'a, S: NetworkStream, A: NetworkAcceptor> Iterator<(Request, Response { - acceptor: A, - /// The socket address that the server is bound to. - pub socket_addr: SocketAddr, +pub struct Listening { + acceptors: Vec, + /// The socket addresses that the server is bound to. + pub sockets: Vec, } impl, S: NetworkStream> Listening { - /// Stop the server from listening to it's socket address. - pub fn close(mut self) -> IoResult<()> { + /// Stop the server from listening to all of its socket addresses. + /// + /// If closing any of the servers acceptors fails, this function returns Err + /// and does not close the rest of the acceptors. + pub fn close(&mut self) -> HttpResult<()> { debug!("closing server"); - self.acceptor.close() + for acceptor in self.acceptors.mut_iter() { + try_io!(acceptor.close()); + } + Ok(()) } } @@ -112,11 +139,11 @@ pub trait Handler, S: NetworkStream>: Send { /// Receives a `Request`/`Response` pair, and should perform some action on them. /// /// This could reading from the request, and writing to the response. - fn handle(self, Incoming); + fn handle(self, Incoming); } -impl, S: NetworkStream> Handler for fn(Incoming) { - fn handle(self, incoming: Incoming) { +impl, S: NetworkStream> Handler for fn(Incoming) { + fn handle(self, incoming: Incoming) { (self)(incoming) } } From 0c674a13768281ffbbce7af74dacdf8332b2d2e8 Mon Sep 17 00:00:00 2001 From: Jonathan Reem Date: Mon, 8 Sep 2014 20:36:02 -0700 Subject: [PATCH 2/3] Update client benchmarks for new Listening representation. --- benches/client.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/benches/client.rs b/benches/client.rs index ad92b985..af7aef47 100644 --- a/benches/client.rs +++ b/benches/client.rs @@ -34,8 +34,8 @@ fn handle(mut incoming: Incoming) { #[bench] fn bench_curl(b: &mut test::Bencher) { - let listening = listen(); - let s = format!("http://{}/", listening.socket_addr); + let mut listening = listen(); + let s = format!("http://{}/", listening.sockets[0]); let url = s.as_slice(); b.iter(|| { curl::http::handle() @@ -63,8 +63,8 @@ impl hyper::header::Header for Foo { #[bench] fn bench_hyper(b: &mut test::Bencher) { - let listening = listen(); - let s = format!("http://{}/", listening.socket_addr); + let mut listening = listen(); + let s = format!("http://{}/", listening.sockets[0]); let url = s.as_slice(); b.iter(|| { let mut req = hyper::get(hyper::Url::parse(url).unwrap()).unwrap(); @@ -79,8 +79,8 @@ fn bench_hyper(b: &mut test::Bencher) { #[bench] fn bench_http(b: &mut test::Bencher) { - let listening = listen(); - let s = format!("http://{}/", listening.socket_addr); + let mut listening = listen(); + let s = format!("http://{}/", listening.sockets[0]); let url = s.as_slice(); b.iter(|| { let mut req: http::client::RequestWriter = http::client::RequestWriter::new( From cfd5cf3c68b0c6dfceb2874b5a41798a3f545fb6 Mon Sep 17 00:00:00 2001 From: Jonathan Reem Date: Wed, 10 Sep 2014 12:13:18 -0700 Subject: [PATCH 3/3] Split Server::listen into two methods to hack around ICE related to default type params Trying to default the type parameters leads to an ICE and strange type errors. I think this is just due to the experimental state of default type params and this change can be rolled back when they are fixed. --- benches/client.rs | 11 +++++------ benches/server.rs | 4 ++-- examples/concurrent-server.rs | 2 +- src/server/mod.rs | 16 ++++++++++++---- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/benches/client.rs b/benches/client.rs index af7aef47..805b3daa 100644 --- a/benches/client.rs +++ b/benches/client.rs @@ -8,9 +8,8 @@ extern crate test; use std::fmt::{mod, Show}; use std::io::net::ip::Ipv4Addr; use hyper::server::{Incoming, Server}; -use hyper::net::HttpAcceptor; -fn listen() -> hyper::server::Listening { +fn listen() -> hyper::server::Listening { let server = Server::http(Ipv4Addr(127, 0, 0, 1), 0); server.listen(handle).unwrap() } @@ -70,11 +69,11 @@ fn bench_hyper(b: &mut test::Bencher) { let mut req = hyper::get(hyper::Url::parse(url).unwrap()).unwrap(); req.headers.set(Foo); - req - .send().unwrap() - .read_to_string().unwrap() + req + .send().unwrap() + .read_to_string().unwrap() }); - listening.close().unwrap() +listening.close().unwrap() } #[bench] diff --git a/benches/server.rs b/benches/server.rs index de9c3e8f..d3ce62f4 100644 --- a/benches/server.rs +++ b/benches/server.rs @@ -28,9 +28,9 @@ fn hyper_handle(mut incoming: hyper::server::Incoming) { #[bench] fn bench_hyper(b: &mut Bencher) { let server = hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 0); - let listener = server.listen(hyper_handle).unwrap(); + let mut listener = server.listen(hyper_handle).unwrap(); - let url = hyper::Url::parse(format!("http://{}", listener.socket_addr).as_slice()).unwrap(); + let url = hyper::Url::parse(format!("http://{}", listener.sockets[0]).as_slice()).unwrap(); b.iter(|| request(url.clone())); listener.close().unwrap(); } diff --git a/examples/concurrent-server.rs b/examples/concurrent-server.rs index 740312fb..3fe9f40a 100644 --- a/examples/concurrent-server.rs +++ b/examples/concurrent-server.rs @@ -19,7 +19,7 @@ trait ConcurrentHandler: Send + Sync { struct Concurrent { handler: Arc } impl Handler for Concurrent { - fn handle(self, mut incoming: Incoming) { + fn handle(self, mut incoming: Incoming) { for (mut req, mut res) in incoming { let clone = self.handler.clone(); spawn(proc() { clone.handle(req, res) }) diff --git a/src/server/mod.rs b/src/server/mod.rs index b061a12d..d0c92a45 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -46,11 +46,14 @@ impl Server { impl, S: NetworkStream, A: NetworkAcceptor> Server { /// Binds to a socket, and starts handling connections. - pub fn listen(self, handler: H) -> HttpResult> - where S: NetworkStream, + /// + /// This method has unbound type parameters, so can be used when you want to use + /// something other than the provided HttpStream, HttpAcceptor, and HttpListener. + pub fn listen_network(self, handler: H) -> HttpResult> + where H: Handler, + S: NetworkStream, A: NetworkAcceptor, - H: Handler, - L: NetworkListener { + L: NetworkListener, { let mut acceptors = Vec::new(); let mut sockets = Vec::new(); for (ip, port) in self.pairs.move_iter() { @@ -76,6 +79,11 @@ impl, S: NetworkStream, A: NetworkAcceptor> Server>(self, handler: H) -> HttpResult> { + self.listen_network::(handler) + } } /// An iterator over incoming connections, represented as pairs of