perf(http): changes http parsing to use httparse crate

httparse is a http1 stateless push parser. This not only speeds up
parsing right now with sync io, but will also be useful for when we get
async io, since it's push based instead of pull.

BREAKING CHANGE: Several public functions and types in the `http` module
  have been removed. They have been replaced with 2 methods that handle
  all of the http1 parsing.
This commit is contained in:
Sean McArthur
2015-03-12 14:01:52 -07:00
parent 003b6551cf
commit b87bb20f0c
15 changed files with 358 additions and 731 deletions

View File

@@ -45,6 +45,7 @@ macro_rules! try_option(
);
impl<'a, H: Handler, L: NetworkListener> Server<'a, H, L> {
/// Creates a new server with the provided handler.
pub fn new(handler: H) -> Server<'a, H, L> {
Server {
handler: handler,
@@ -97,7 +98,7 @@ S: NetworkStream + Clone + Send> Server<'a, H, L> {
debug!("threads = {:?}", threads);
let pool = ListenerPool::new(listener.clone());
let work = move |stream| handle_connection(stream, &handler);
let work = move |stream| keep_alive_loop(stream, &handler);
let guard = thread::scoped(move || pool.accept(work, threads));
@@ -109,7 +110,7 @@ S: NetworkStream + Clone + Send> Server<'a, H, L> {
}
fn handle_connection<S, H>(mut stream: S, handler: &H)
fn keep_alive_loop<'h, S, H>(mut stream: S, handler: &'h H)
where S: NetworkStream + Clone, H: Handler {
debug!("Incoming stream");
let addr = match stream.peer_addr() {
@@ -120,36 +121,47 @@ where S: NetworkStream + Clone, H: Handler {
}
};
let mut rdr = BufReader::new(stream.clone());
let mut stream_clone = stream.clone();
let mut rdr = BufReader::new(&mut stream_clone as &mut NetworkStream);
let mut wrt = BufWriter::new(stream);
let mut keep_alive = true;
while keep_alive {
let mut res = Response::new(&mut wrt);
let req = match Request::new(&mut rdr, addr) {
Ok(req) => req,
Err(e@HttpIoError(_)) => {
debug!("ioerror in keepalive loop = {:?}", e);
return;
}
Err(e) => {
//TODO: send a 400 response
error!("request error = {:?}", e);
return;
}
};
keep_alive = match (req.version, req.headers.get::<Connection>()) {
(Http10, Some(conn)) if !conn.contains(&KeepAlive) => false,
(Http11, Some(conn)) if conn.contains(&Close) => false,
_ => true
};
res.version = req.version;
handler.handle(req, res);
keep_alive = handle_connection(addr, &mut rdr, &mut wrt, handler);
debug!("keep_alive = {:?}", keep_alive);
}
}
fn handle_connection<'a, 'aa, 'h, S, H>(
addr: SocketAddr,
rdr: &'a mut BufReader<&'aa mut NetworkStream>,
wrt: &mut BufWriter<S>,
handler: &'h H
) -> bool where 'aa: 'a, S: NetworkStream, H: Handler {
let mut res = Response::new(wrt);
let req = match Request::<'a, 'aa>::new(rdr, addr) {
Ok(req) => req,
Err(e@HttpIoError(_)) => {
debug!("ioerror in keepalive loop = {:?}", e);
return false;
}
Err(e) => {
//TODO: send a 400 response
error!("request error = {:?}", e);
return false;
}
};
let keep_alive = match (req.version, req.headers.get::<Connection>()) {
(Http10, Some(conn)) if !conn.contains(&KeepAlive) => false,
(Http11, Some(conn)) if conn.contains(&Close) => false,
_ => true
};
res.version = req.version;
handler.handle(req, res);
keep_alive
}
/// A listening server, which can later be closed.
pub struct Listening {
_guard: JoinGuard<'static, ()>,
@@ -171,11 +183,11 @@ pub trait Handler: Sync + Send {
/// Receives a `Request`/`Response` pair, and should perform some action on them.
///
/// This could reading from the request, and writing to the response.
fn handle<'a>(&'a self, Request<'a>, Response<'a, Fresh>);
fn handle<'a, 'aa, 'b, 's>(&'s self, Request<'aa, 'a>, Response<'b, Fresh>);
}
impl<F> Handler for F where F: Fn(Request, Response<Fresh>), F: Sync + Send {
fn handle(&self, req: Request, res: Response<Fresh>) {
(*self)(req, res)
fn handle<'a, 'aa, 'b, 's>(&'s self, req: Request<'a, 'aa>, res: Response<'b, Fresh>) {
self(req, res)
}
}