Merge pull request #113 from reem/remove-intertwining

Removes intertwining, since it is slow and showing up in profiles
This commit is contained in:
Sean McArthur
2014-11-10 17:53:49 -08:00
5 changed files with 29 additions and 46 deletions

View File

@@ -16,9 +16,6 @@ git = "https://github.com/hyperium/mime.rs"
[dependencies.unsafe-any] [dependencies.unsafe-any]
git = "https://github.com/reem/rust-unsafe-any" git = "https://github.com/reem/rust-unsafe-any"
[dependencies.intertwine]
git = "https://github.com/reem/rust-intertwine"
[dependencies.move-acceptor] [dependencies.move-acceptor]
git = "https://github.com/reem/rust-move-acceptor" git = "https://github.com/reem/rust-move-acceptor"

View File

@@ -35,7 +35,7 @@ fn handle(mut incoming: Incoming) {
#[bench] #[bench]
fn bench_curl(b: &mut test::Bencher) { fn bench_curl(b: &mut test::Bencher) {
let mut listening = listen(); let mut listening = listen();
let s = format!("http://{}/", listening.sockets[0]); let s = format!("http://{}/", listening.socket);
let url = s.as_slice(); let url = s.as_slice();
b.iter(|| { b.iter(|| {
curl::http::handle() curl::http::handle()
@@ -67,7 +67,7 @@ impl hyper::header::HeaderFormat for Foo {
#[bench] #[bench]
fn bench_hyper(b: &mut test::Bencher) { fn bench_hyper(b: &mut test::Bencher) {
let mut listening = listen(); let mut listening = listen();
let s = format!("http://{}/", listening.sockets[0]); let s = format!("http://{}/", listening.socket);
let url = s.as_slice(); let url = s.as_slice();
b.iter(|| { b.iter(|| {
let mut req = hyper::client::Request::get(hyper::Url::parse(url).unwrap()).unwrap(); let mut req = hyper::client::Request::get(hyper::Url::parse(url).unwrap()).unwrap();
@@ -83,7 +83,7 @@ fn bench_hyper(b: &mut test::Bencher) {
#[bench] #[bench]
fn bench_http(b: &mut test::Bencher) { fn bench_http(b: &mut test::Bencher) {
let mut listening = listen(); let mut listening = listen();
let s = format!("http://{}/", listening.sockets[0]); let s = format!("http://{}/", listening.socket);
let url = s.as_slice(); let url = s.as_slice();
b.iter(|| { b.iter(|| {
let mut req: http::client::RequestWriter = http::client::RequestWriter::new( let mut req: http::client::RequestWriter = http::client::RequestWriter::new(

View File

@@ -31,7 +31,7 @@ fn bench_hyper(b: &mut Bencher) {
let server = hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 0); let server = hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 0);
let mut listener = server.listen(hyper_handle).unwrap(); let mut listener = server.listen(hyper_handle).unwrap();
let url = hyper::Url::parse(format!("http://{}", listener.sockets[0]).as_slice()).unwrap(); let url = hyper::Url::parse(format!("http://{}", listener.socket).as_slice()).unwrap();
b.iter(|| request(url.clone())); b.iter(|| request(url.clone()));
listener.close().unwrap(); listener.close().unwrap();
} }

View File

@@ -134,7 +134,6 @@ extern crate openssl;
#[cfg(test)] extern crate test; #[cfg(test)] extern crate test;
extern crate "unsafe-any" as uany; extern crate "unsafe-any" as uany;
extern crate "move-acceptor" as macceptor; extern crate "move-acceptor" as macceptor;
extern crate intertwine;
extern crate typeable; extern crate typeable;
extern crate cookie; extern crate cookie;

View File

@@ -1,10 +1,9 @@
//! HTTP Server //! HTTP Server
use std::io::{Listener, IoResult, EndOfFile}; use std::io::{Listener, EndOfFile};
use std::io::net::ip::{IpAddr, Port, SocketAddr}; use std::io::net::ip::{IpAddr, Port, SocketAddr};
use std::task::TaskBuilder; use std::task::TaskBuilder;
use intertwine::{Intertwine, Intertwined}; use macceptor::{MoveAcceptor, MoveConnections};
use macceptor::MoveAcceptor;
pub use self::request::Request; pub use self::request::Request;
pub use self::response::Response; pub use self::response::Response;
@@ -22,7 +21,8 @@ pub mod response;
/// Once listening, it will create a `Request`/`Response` pair for each /// Once listening, it will create a `Request`/`Response` pair for each
/// incoming connection, and hand them to the provided handler. /// incoming connection, and hand them to the provided handler.
pub struct Server<L = HttpListener> { pub struct Server<L = HttpListener> {
pairs: Vec<(IpAddr, Port)> ip: IpAddr,
port: Port
} }
macro_rules! try_option( macro_rules! try_option(
@@ -37,12 +37,10 @@ macro_rules! try_option(
impl Server<HttpListener> { impl Server<HttpListener> {
/// Creates a new server that will handle `HttpStream`s. /// Creates a new server that will handle `HttpStream`s.
pub fn http(ip: IpAddr, port: Port) -> Server { pub fn http(ip: IpAddr, port: Port) -> Server {
Server { pairs: vec![(ip, port)] } Server {
} ip: ip,
port: port
/// Creates a server that can listen to many (ip, port) pairs. }
pub fn many(pairs: Vec<(IpAddr, Port)>) -> Server {
Server { pairs: pairs }
} }
} }
@@ -56,30 +54,21 @@ impl<L: NetworkListener<S, A>, S: NetworkStream, A: NetworkAcceptor<S>> Server<L
S: NetworkStream, S: NetworkStream,
A: NetworkAcceptor<S>, A: NetworkAcceptor<S>,
L: NetworkListener<S, A>, { L: NetworkListener<S, A>, {
let mut acceptors = Vec::new(); debug!("binding to {}:{}", self.ip, self.port);
let mut sockets = Vec::new(); let mut listener: L = try_io!(NetworkListener::<S, A>::bind((self.ip, self.port)));
for (ip, port) in self.pairs.into_iter() {
debug!("binding to {}:{}", ip, port);
let mut listener: L = try_io!(NetworkListener::<S, A>::bind((ip, port)));
sockets.push(try_io!(listener.socket_name())); let socket = try_io!(listener.socket_name());
let acceptor = try_io!(listener.listen()); let acceptor = try_io!(listener.listen());
acceptors.push(acceptor.clone());
}
let connections = acceptors.clone()
.into_iter()
.map(|acceptor| acceptor.move_incoming())
.intertwine();
let captured = acceptor.clone();
TaskBuilder::new().named("hyper acceptor").spawn(proc() { TaskBuilder::new().named("hyper acceptor").spawn(proc() {
handler.handle(Incoming { from: connections }); handler.handle(Incoming { from: captured.move_incoming() });
}); });
Ok(Listening { Ok(Listening {
acceptors: acceptors, acceptor: acceptor,
sockets: sockets, socket: socket,
}) })
} }
@@ -90,11 +79,11 @@ impl<L: NetworkListener<S, A>, S: NetworkStream, A: NetworkAcceptor<S>> Server<L
} }
/// An iterator over incoming `Connection`s. /// An iterator over incoming `Connection`s.
pub struct Incoming<S: Send = HttpStream> { pub struct Incoming<A = HttpAcceptor> {
from: Intertwined<IoResult<S>> from: MoveConnections<A>
} }
impl<S: NetworkStream + 'static> Iterator<Connection<S>> for Incoming<S> { impl<S: NetworkStream + 'static, A: NetworkAcceptor<S>> Iterator<Connection<S>> for Incoming<A> {
fn next(&mut self) -> Option<Connection<S>> { fn next(&mut self) -> Option<Connection<S>> {
for conn in self.from { for conn in self.from {
match conn { match conn {
@@ -130,9 +119,9 @@ impl<S: NetworkStream + 'static> Connection<S> {
/// A listening server, which can later be closed. /// A listening server, which can later be closed.
pub struct Listening<A = HttpAcceptor> { pub struct Listening<A = HttpAcceptor> {
acceptors: Vec<A>, acceptor: A,
/// The socket addresses that the server is bound to. /// The socket addresses that the server is bound to.
pub sockets: Vec<SocketAddr>, pub socket: SocketAddr,
} }
impl<A: NetworkAcceptor<S>, S: NetworkStream> Listening<A> { impl<A: NetworkAcceptor<S>, S: NetworkStream> Listening<A> {
@@ -142,9 +131,7 @@ impl<A: NetworkAcceptor<S>, S: NetworkStream> Listening<A> {
/// and does not close the rest of the acceptors. /// and does not close the rest of the acceptors.
pub fn close(&mut self) -> HttpResult<()> { pub fn close(&mut self) -> HttpResult<()> {
debug!("closing server"); debug!("closing server");
for acceptor in self.acceptors.iter_mut() { try_io!(self.acceptor.close());
try_io!(acceptor.close());
}
Ok(()) Ok(())
} }
} }
@@ -154,11 +141,11 @@ pub trait Handler<A: NetworkAcceptor<S>, S: NetworkStream>: Send {
/// Receives a `Request`/`Response` pair, and should perform some action on them. /// Receives a `Request`/`Response` pair, and should perform some action on them.
/// ///
/// This could reading from the request, and writing to the response. /// This could reading from the request, and writing to the response.
fn handle(self, Incoming<S>); fn handle(self, Incoming<A>);
} }
impl<A: NetworkAcceptor<S>, S: NetworkStream> Handler<A, S> for fn(Incoming<S>) { impl<A: NetworkAcceptor<S>, S: NetworkStream> Handler<A, S> for fn(Incoming<A>) {
fn handle(self, incoming: Incoming<S>) { fn handle(self, incoming: Incoming<A>) {
(self)(incoming) (self)(incoming)
} }
} }