feat(server): keep-alive!
Internals have been shuffled around such that Request and Reponse are now given only a mutable reference to the stream, instead of being allowed to consume it. This allows the server to re-use the streams if keep-alive is true. A task pool is used, and the number of the threads can currently be adjusted by using the `listen_threads()` method on Server. [breaking-change]
This commit is contained in:
@@ -1,17 +1,22 @@
|
||||
//! HTTP Server
|
||||
use std::io::{Listener, EndOfFile};
|
||||
use std::io::{Listener, EndOfFile, BufferedReader, BufferedWriter};
|
||||
use std::io::net::ip::{IpAddr, Port, SocketAddr};
|
||||
use std::os;
|
||||
use std::sync::{Arc, TaskPool};
|
||||
use std::task::TaskBuilder;
|
||||
|
||||
use macceptor::{MoveAcceptor, MoveConnections};
|
||||
|
||||
pub use self::request::Request;
|
||||
pub use self::response::Response;
|
||||
|
||||
pub use net::{Fresh, Streaming};
|
||||
|
||||
use {HttpResult};
|
||||
use header::common::Connection;
|
||||
use header::common::connection::{KeepAlive, Close};
|
||||
use net::{NetworkListener, NetworkAcceptor, NetworkStream,
|
||||
HttpAcceptor, HttpListener, HttpStream,
|
||||
Fresh};
|
||||
HttpAcceptor, HttpListener, HttpStream};
|
||||
use version::HttpVersion::{Http10, Http11};
|
||||
|
||||
pub mod request;
|
||||
pub mod response;
|
||||
@@ -45,12 +50,12 @@ impl Server<HttpListener> {
|
||||
}
|
||||
|
||||
impl<L: NetworkListener<S, A>, S: NetworkStream, A: NetworkAcceptor<S>> Server<L> {
|
||||
/// Binds to a socket, and starts handling connections.
|
||||
/// Binds to a socket, and starts handling connections using a task pool.
|
||||
///
|
||||
/// This method has unbound type parameters, so can be used when you want to use
|
||||
/// something other than the provided HttpStream, HttpAcceptor, and HttpListener.
|
||||
pub fn listen_network<H, S, A, L>(self, handler: H) -> HttpResult<Listening<A>>
|
||||
where H: Handler<A, S>,
|
||||
pub fn listen_network<H, S, A, L>(self, handler: H, threads: uint) -> HttpResult<Listening<A>>
|
||||
where H: Handler,
|
||||
S: NetworkStream,
|
||||
A: NetworkAcceptor<S>,
|
||||
L: NetworkListener<S, A>, {
|
||||
@@ -61,9 +66,61 @@ impl<L: NetworkListener<S, A>, S: NetworkStream, A: NetworkAcceptor<S>> Server<L
|
||||
|
||||
let acceptor = try!(listener.listen());
|
||||
|
||||
let captured = acceptor.clone();
|
||||
let mut captured = acceptor.clone();
|
||||
TaskBuilder::new().named("hyper acceptor").spawn(proc() {
|
||||
handler.handle(Incoming { from: captured.move_incoming() });
|
||||
let handler = Arc::new(handler);
|
||||
debug!("threads = {}", threads);
|
||||
let pool = TaskPool::new(threads);
|
||||
for conn in captured.incoming() {
|
||||
match conn {
|
||||
Ok(mut stream) => {
|
||||
debug!("Incoming stream");
|
||||
let handler = handler.clone();
|
||||
pool.execute(proc() {
|
||||
let addr = match stream.peer_name() {
|
||||
Ok(addr) => addr,
|
||||
Err(e) => {
|
||||
error!("Peer Name error: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let mut rdr = BufferedReader::new(stream.clone());
|
||||
let mut wrt = BufferedWriter::new(stream);
|
||||
|
||||
let mut keep_alive = true;
|
||||
while keep_alive {
|
||||
let mut res = Response::new(&mut wrt);
|
||||
let req = match Request::new(&mut rdr, addr) {
|
||||
Ok(req) => req,
|
||||
Err(e) => {
|
||||
//TODO: send a 400 response
|
||||
error!("request error: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
keep_alive = match (req.version, req.headers.get::<Connection>()) {
|
||||
(Http10, Some(conn)) if !conn.0.contains(&KeepAlive) => false,
|
||||
(Http11, Some(conn)) if conn.0.contains(&Close) => false,
|
||||
_ => true
|
||||
};
|
||||
res.version = req.version;
|
||||
handler.handle(req, res);
|
||||
debug!("keep_alive = {}", keep_alive);
|
||||
}
|
||||
|
||||
});
|
||||
},
|
||||
Err(ref e) if e.kind == EndOfFile => {
|
||||
debug!("server closed");
|
||||
break;
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Connection failed: {}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Listening {
|
||||
@@ -72,49 +129,16 @@ impl<L: NetworkListener<S, A>, S: NetworkStream, A: NetworkAcceptor<S>> Server<L
|
||||
})
|
||||
}
|
||||
|
||||
/// Binds to a socket and starts handling connections with the specified number of tasks.
|
||||
pub fn listen_threads<H: Handler>(self, handler: H, threads: uint) -> HttpResult<Listening<HttpAcceptor>> {
|
||||
self.listen_network::<H, HttpStream, HttpAcceptor, HttpListener>(handler, threads)
|
||||
}
|
||||
|
||||
/// Binds to a socket and starts handling connections.
|
||||
pub fn listen<H: Handler<HttpAcceptor, HttpStream>>(self, handler: H) -> HttpResult<Listening<HttpAcceptor>> {
|
||||
self.listen_network::<H, HttpStream, HttpAcceptor, HttpListener>(handler)
|
||||
pub fn listen<H: Handler>(self, handler: H) -> HttpResult<Listening<HttpAcceptor>> {
|
||||
self.listen_threads(handler, os::num_cpus() * 5 / 4)
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator over incoming `Connection`s.
|
||||
pub struct Incoming<A = HttpAcceptor> {
|
||||
from: MoveConnections<A>
|
||||
}
|
||||
|
||||
impl<S: NetworkStream + 'static, A: NetworkAcceptor<S>> Iterator<Connection<S>> for Incoming<A> {
|
||||
fn next(&mut self) -> Option<Connection<S>> {
|
||||
for conn in self.from {
|
||||
match conn {
|
||||
Ok(stream) => {
|
||||
debug!("Incoming stream");
|
||||
return Some(Connection(stream));
|
||||
},
|
||||
Err(ref e) if e.kind == EndOfFile => return None, // server closed
|
||||
Err(e) => {
|
||||
error!("Connection failed: {}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// An incoming connection. It can be opened to receive a request/response pair.
|
||||
pub struct Connection<S: Send = HttpStream>(S);
|
||||
|
||||
impl<S: NetworkStream + 'static> Connection<S> {
|
||||
/// Opens the incoming connection, parsing it into a Request/Response pair.
|
||||
pub fn open(self) -> HttpResult<(Request, Response<Fresh>)> {
|
||||
let stream = self.0;
|
||||
let clone = stream.clone();
|
||||
let req = try!(Request::new(stream));
|
||||
let mut res = Response::new(clone);
|
||||
res.version = req.version;
|
||||
return Ok((req, res))
|
||||
}
|
||||
}
|
||||
|
||||
/// A listening server, which can later be closed.
|
||||
@@ -125,10 +149,7 @@ pub struct Listening<A = HttpAcceptor> {
|
||||
}
|
||||
|
||||
impl<A: NetworkAcceptor<S>, S: NetworkStream> Listening<A> {
|
||||
/// Stop the server from listening to all of its socket addresses.
|
||||
///
|
||||
/// If closing any of the servers acceptors fails, this function returns Err
|
||||
/// and does not close the rest of the acceptors.
|
||||
/// Stop the server from listening to its socket address.
|
||||
pub fn close(&mut self) -> HttpResult<()> {
|
||||
debug!("closing server");
|
||||
try!(self.acceptor.close());
|
||||
@@ -137,16 +158,16 @@ impl<A: NetworkAcceptor<S>, S: NetworkStream> Listening<A> {
|
||||
}
|
||||
|
||||
/// A handler that can handle incoming requests for a server.
|
||||
pub trait Handler<A: NetworkAcceptor<S>, S: NetworkStream>: Send {
|
||||
pub trait Handler: Sync + Send {
|
||||
/// Receives a `Request`/`Response` pair, and should perform some action on them.
|
||||
///
|
||||
/// This could reading from the request, and writing to the response.
|
||||
fn handle(self, Incoming<A>);
|
||||
fn handle(&self, Request, Response<Fresh>);
|
||||
}
|
||||
|
||||
impl<A: NetworkAcceptor<S>, S: NetworkStream> Handler<A, S> for fn(Incoming<A>) {
|
||||
fn handle(self, incoming: Incoming<A>) {
|
||||
(self)(incoming)
|
||||
impl Handler for fn(Request, Response<Fresh>) {
|
||||
fn handle(&self, req: Request, res: Response<Fresh>) {
|
||||
(*self)(req, res)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user