From 3cd9b10bcb4faaa244a4b4704ef7f77048433a37 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 12 Nov 2014 15:17:41 -0800 Subject: [PATCH] feat(server): keep-alive! Internals have been shuffled around such that Request and Reponse are now given only a mutable reference to the stream, instead of being allowed to consume it. This allows the server to re-use the streams if keep-alive is true. A task pool is used, and the number of the threads can currently be adjusted by using the `listen_threads()` method on Server. [breaking-change] --- Cargo.toml | 3 - benches/client.rs | 17 ++-- benches/server.rs | 12 ++- examples/concurrent-server.rs | 77 ------------------ examples/hello.rs | 19 ++--- examples/server.rs | 58 +++++++------- src/header/common/connection.rs | 2 +- src/lib.rs | 12 +-- src/net.rs | 13 +++ src/server/mod.rs | 137 ++++++++++++++++++-------------- src/server/request.rs | 20 +++-- src/server/response.rs | 29 +++---- 12 files changed, 167 insertions(+), 232 deletions(-) delete mode 100644 examples/concurrent-server.rs diff --git a/Cargo.toml b/Cargo.toml index 4d179a0b..63ac4cd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,9 +16,6 @@ git = "https://github.com/hyperium/mime.rs" [dependencies.unsafe-any] git = "https://github.com/reem/rust-unsafe-any" -[dependencies.move-acceptor] -git = "https://github.com/reem/rust-move-acceptor" - [dependencies.typeable] git = "https://github.com/reem/rust-typeable" diff --git a/benches/client.rs b/benches/client.rs index 266cf043..6d62880e 100644 --- a/benches/client.rs +++ b/benches/client.rs @@ -7,28 +7,25 @@ extern crate test; use std::fmt::{mod, Show}; use std::io::net::ip::Ipv4Addr; -use hyper::server::{Incoming, Server}; +use hyper::server::{Request, Response, Server}; fn listen() -> hyper::server::Listening { let server = Server::http(Ipv4Addr(127, 0, 0, 1), 0); server.listen(handle).unwrap() } -macro_rules! try_continue( +macro_rules! try_return( ($e:expr) => {{ match $e { Ok(v) => v, - Err(..) => continue + Err(..) => return } }}) -fn handle(mut incoming: Incoming) { - for conn in incoming { - let (_, res) = try_continue!(conn.open()); - let mut res = try_continue!(res.start()); - try_continue!(res.write(b"Benchmarking hyper vs others!")) - try_continue!(res.end()); - } +fn handle(_: Request, res: Response) { + let mut res = try_return!(res.start()); + try_return!(res.write(b"Benchmarking hyper vs others!")) + try_return!(res.end()); } diff --git a/benches/server.rs b/benches/server.rs index 8fc701f3..378ba4ec 100644 --- a/benches/server.rs +++ b/benches/server.rs @@ -9,6 +9,7 @@ use test::Bencher; use std::io::net::ip::{SocketAddr, Ipv4Addr}; use http::server::Server; +use hyper::server::{Request, Response}; static PHRASE: &'static [u8] = b"Benchmarking hyper vs others!"; @@ -17,13 +18,10 @@ fn request(url: hyper::Url) { req.start().unwrap().send().unwrap().read_to_string().unwrap(); } -fn hyper_handle(mut incoming: hyper::server::Incoming) { - for conn in incoming { - let (_, res) = conn.open().unwrap(); - let mut res = res.start().unwrap(); - res.write(PHRASE).unwrap(); - res.end().unwrap(); - } +fn hyper_handle(_: Request, res: Response) { + let mut res = res.start().unwrap(); + res.write(PHRASE).unwrap(); + res.end().unwrap(); } #[bench] diff --git a/examples/concurrent-server.rs b/examples/concurrent-server.rs deleted file mode 100644 index f45ad130..00000000 --- a/examples/concurrent-server.rs +++ /dev/null @@ -1,77 +0,0 @@ -#![feature(macro_rules, default_type_params)] - -extern crate hyper; - -use std::io::util::copy; -use std::io::net::ip::Ipv4Addr; -use std::sync::Arc; - -use hyper::{Get, Post}; -use hyper::server::{Server, Handler, Incoming, Request, Response}; -use hyper::header::common::ContentLength; -use hyper::net::{HttpStream, HttpAcceptor, Fresh}; - -macro_rules! try_abort( - ($e:expr) => {{ - match $e { - Ok(v) => v, - Err(..) => return - } - }} -) - -trait ConcurrentHandler: Send + Sync { - fn handle(&self, req: Request, res: Response); -} - -struct Concurrent { handler: Arc } - -impl Handler for Concurrent { - fn handle(self, mut incoming: Incoming) { - for conn in incoming { - let clone = self.handler.clone(); - spawn(proc() { - let (req, res) = try_abort!(conn.open()); - clone.handle(req, res); - }) - } - } -} - -struct Echo; - -impl ConcurrentHandler for Echo { - fn handle(&self, mut req: Request, mut res: Response) { - match req.uri { - hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) { - (&Get, "/") | (&Get, "/echo") => { - let out = b"Try POST /echo"; - - res.headers_mut().set(ContentLength(out.len())); - let mut res = try_abort!(res.start()); - try_abort!(res.write(out)); - try_abort!(res.end()); - return; - }, - (&Post, "/echo") => (), // fall through, fighting mutable borrows - _ => { - *res.status_mut() = hyper::status::NotFound; - try_abort!(res.start().and_then(|res| res.end())); - return; - } - }, - _ => { - try_abort!(res.start().and_then(|res| res.end())); - return; - } - } - let mut res = try_abort!(res.start()); - try_abort!(copy(&mut req, &mut res)); - try_abort!(res.end()); - } -} - -fn main() { - let server = Server::http(Ipv4Addr(127, 0, 0, 1), 3000); - server.listen(Concurrent { handler: Arc::new(Echo) }).unwrap(); -} diff --git a/examples/hello.rs b/examples/hello.rs index c7a06b54..0f92d6bd 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -1,23 +1,16 @@ extern crate hyper; -use std::sync::TaskPool; use std::io::net::ip::Ipv4Addr; +use hyper::server::{Request, Response}; static PHRASE: &'static [u8] = b"Hello World!"; -fn hyper_handle(mut incoming: hyper::server::Incoming) { - let pool = TaskPool::new(100); - - for conn in incoming { - pool.execute(proc() { - let (_, res) = conn.open().unwrap(); - let mut res = res.start().unwrap(); - res.write(PHRASE).unwrap(); - res.end().unwrap(); - }); - } +fn hello(_: Request, res: Response) { + let mut res = res.start().unwrap(); + res.write(PHRASE).unwrap(); + res.end().unwrap(); } fn main() { - hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000).listen(hyper_handle).unwrap(); + hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000).listen(hello).unwrap(); } diff --git a/examples/server.rs b/examples/server.rs index 09c9bbfc..3bcbfd6e 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -1,54 +1,52 @@ -#![feature(macro_rules)] +#![feature(macro_rules, phase)] extern crate hyper; +#[phase(plugin, link)] extern crate log; use std::io::util::copy; use std::io::net::ip::Ipv4Addr; use hyper::{Get, Post}; use hyper::header::common::ContentLength; -use hyper::server::{Server, Incoming}; +use hyper::server::{Server, Request, Response}; -macro_rules! try_continue( +macro_rules! try_return( ($e:expr) => {{ match $e { Ok(v) => v, - Err(e) => { println!("Error: {}", e); continue; } + Err(e) => { error!("Error: {}", e); return; } } }} ) -fn echo(mut incoming: Incoming) { - for conn in incoming { - let (mut req, mut res) = try_continue!(conn.open()); - match req.uri { - hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) { - (&Get, "/") | (&Get, "/echo") => { - let out = b"Try POST /echo"; +fn echo(mut req: Request, mut res: Response) { + match req.uri { + hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) { + (&Get, "/") | (&Get, "/echo") => { + let out = b"Try POST /echo"; - res.headers_mut().set(ContentLength(out.len())); - let mut res = try_continue!(res.start()); - try_continue!(res.write(out)); - try_continue!(res.end()); - continue; - }, - (&Post, "/echo") => (), // fall through, fighting mutable borrows - _ => { - *res.status_mut() = hyper::status::NotFound; - try_continue!(res.start().and_then(|res| res.end())); - continue; - } + res.headers_mut().set(ContentLength(out.len())); + let mut res = try_return!(res.start()); + try_return!(res.write(out)); + try_return!(res.end()); + return; }, + (&Post, "/echo") => (), // fall through, fighting mutable borrows _ => { - try_continue!(res.start().and_then(|res| res.end())); - continue; + *res.status_mut() = hyper::status::NotFound; + try_return!(res.start().and_then(|res| res.end())); + return; } - }; + }, + _ => { + try_return!(res.start().and_then(|res| res.end())); + return; + } + }; - let mut res = try_continue!(res.start()); - try_continue!(copy(&mut req, &mut res)); - try_continue!(res.end()); - } + let mut res = try_return!(res.start()); + try_return!(copy(&mut req, &mut res)); + try_return!(res.end()); } fn main() { diff --git a/src/header/common/connection.rs b/src/header/common/connection.rs index 405a655e..a868c785 100644 --- a/src/header/common/connection.rs +++ b/src/header/common/connection.rs @@ -3,7 +3,7 @@ use std::fmt::{mod, Show}; use std::str::FromStr; use super::util::{from_comma_delimited, fmt_comma_delimited}; -use self::ConnectionOption::{KeepAlive, Close, ConnectionHeader}; +pub use self::ConnectionOption::{KeepAlive, Close, ConnectionHeader}; /// The `Connection` header. #[deriving(Clone, PartialEq, Show)] diff --git a/src/lib.rs b/src/lib.rs index 088362a3..6aa484d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,10 +66,10 @@ //! //! #### Handler + Server //! -//! A Handler in Hyper just accepts an Iterator of `(Request, Response)` pairs and -//! does whatever it wants with it. This gives Handlers maximum flexibility to decide -//! on concurrency strategy and exactly how they want to distribute the work of -//! dealing with `Request` and `Response.` +//! A `Handler` in Hyper accepts a `Request` and `Response`. This is where +//! user-code can handle each connection. The server accepts connections in a +//! task pool with a customizable number of threads, and passes the Request / +//! Response to the handler. //! //! #### Request //! @@ -133,7 +133,6 @@ extern crate openssl; #[phase(plugin,link)] extern crate log; #[cfg(test)] extern crate test; extern crate "unsafe-any" as uany; -extern crate "move-acceptor" as macceptor; extern crate typeable; extern crate cookie; @@ -252,7 +251,4 @@ impl FromError for HttpError { fn _assert_send() { _assert_send::>(); _assert_send::(); - - _assert_send::(); - _assert_send::>(); } diff --git a/src/net.rs b/src/net.rs index 9c2d0a31..9d17e0da 100644 --- a/src/net.rs +++ b/src/net.rs @@ -82,6 +82,19 @@ impl Writer for Box { fn flush(&mut self) -> IoResult<()> { (**self).flush() } } +impl<'a> Reader for &'a mut NetworkStream { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> IoResult { (**self).read(buf) } +} + +impl<'a> Writer for &'a mut NetworkStream { + #[inline] + fn write(&mut self, msg: &[u8]) -> IoResult<()> { (**self).write(msg) } + + #[inline] + fn flush(&mut self) -> IoResult<()> { (**self).flush() } +} + impl UncheckedBoxAnyDowncast for Box { unsafe fn downcast_unchecked(self) -> Box { let to = *mem::transmute::<&Box, &raw::TraitObject>(&self); diff --git a/src/server/mod.rs b/src/server/mod.rs index 55638b11..76439834 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,17 +1,22 @@ //! HTTP Server -use std::io::{Listener, EndOfFile}; +use std::io::{Listener, EndOfFile, BufferedReader, BufferedWriter}; use std::io::net::ip::{IpAddr, Port, SocketAddr}; +use std::os; +use std::sync::{Arc, TaskPool}; use std::task::TaskBuilder; -use macceptor::{MoveAcceptor, MoveConnections}; pub use self::request::Request; pub use self::response::Response; +pub use net::{Fresh, Streaming}; + use {HttpResult}; +use header::common::Connection; +use header::common::connection::{KeepAlive, Close}; use net::{NetworkListener, NetworkAcceptor, NetworkStream, - HttpAcceptor, HttpListener, HttpStream, - Fresh}; + HttpAcceptor, HttpListener, HttpStream}; +use version::HttpVersion::{Http10, Http11}; pub mod request; pub mod response; @@ -45,12 +50,12 @@ impl Server { } impl, S: NetworkStream, A: NetworkAcceptor> Server { - /// Binds to a socket, and starts handling connections. + /// Binds to a socket, and starts handling connections using a task pool. /// /// This method has unbound type parameters, so can be used when you want to use /// something other than the provided HttpStream, HttpAcceptor, and HttpListener. - pub fn listen_network(self, handler: H) -> HttpResult> - where H: Handler, + pub fn listen_network(self, handler: H, threads: uint) -> HttpResult> + where H: Handler, S: NetworkStream, A: NetworkAcceptor, L: NetworkListener, { @@ -61,9 +66,61 @@ impl, S: NetworkStream, A: NetworkAcceptor> Server { + debug!("Incoming stream"); + let handler = handler.clone(); + pool.execute(proc() { + let addr = match stream.peer_name() { + Ok(addr) => addr, + Err(e) => { + error!("Peer Name error: {}", e); + return; + } + }; + let mut rdr = BufferedReader::new(stream.clone()); + let mut wrt = BufferedWriter::new(stream); + + let mut keep_alive = true; + while keep_alive { + let mut res = Response::new(&mut wrt); + let req = match Request::new(&mut rdr, addr) { + Ok(req) => req, + Err(e) => { + //TODO: send a 400 response + error!("request error: {}", e); + return; + } + }; + + keep_alive = match (req.version, req.headers.get::()) { + (Http10, Some(conn)) if !conn.0.contains(&KeepAlive) => false, + (Http11, Some(conn)) if conn.0.contains(&Close) => false, + _ => true + }; + res.version = req.version; + handler.handle(req, res); + debug!("keep_alive = {}", keep_alive); + } + + }); + }, + Err(ref e) if e.kind == EndOfFile => { + debug!("server closed"); + break; + }, + Err(e) => { + error!("Connection failed: {}", e); + continue; + } + } + } }); Ok(Listening { @@ -72,49 +129,16 @@ impl, S: NetworkStream, A: NetworkAcceptor> Server(self, handler: H, threads: uint) -> HttpResult> { + self.listen_network::(handler, threads) + } + /// Binds to a socket and starts handling connections. - pub fn listen>(self, handler: H) -> HttpResult> { - self.listen_network::(handler) + pub fn listen(self, handler: H) -> HttpResult> { + self.listen_threads(handler, os::num_cpus() * 5 / 4) } -} -/// An iterator over incoming `Connection`s. -pub struct Incoming { - from: MoveConnections -} - -impl> Iterator> for Incoming { - fn next(&mut self) -> Option> { - for conn in self.from { - match conn { - Ok(stream) => { - debug!("Incoming stream"); - return Some(Connection(stream)); - }, - Err(ref e) if e.kind == EndOfFile => return None, // server closed - Err(e) => { - error!("Connection failed: {}", e); - continue; - } - } - } - None - } -} - -/// An incoming connection. It can be opened to receive a request/response pair. -pub struct Connection(S); - -impl Connection { - /// Opens the incoming connection, parsing it into a Request/Response pair. - pub fn open(self) -> HttpResult<(Request, Response)> { - let stream = self.0; - let clone = stream.clone(); - let req = try!(Request::new(stream)); - let mut res = Response::new(clone); - res.version = req.version; - return Ok((req, res)) - } } /// A listening server, which can later be closed. @@ -125,10 +149,7 @@ pub struct Listening { } impl, S: NetworkStream> Listening { - /// Stop the server from listening to all of its socket addresses. - /// - /// If closing any of the servers acceptors fails, this function returns Err - /// and does not close the rest of the acceptors. + /// Stop the server from listening to its socket address. pub fn close(&mut self) -> HttpResult<()> { debug!("closing server"); try!(self.acceptor.close()); @@ -137,16 +158,16 @@ impl, S: NetworkStream> Listening { } /// A handler that can handle incoming requests for a server. -pub trait Handler, S: NetworkStream>: Send { +pub trait Handler: Sync + Send { /// Receives a `Request`/`Response` pair, and should perform some action on them. /// /// This could reading from the request, and writing to the response. - fn handle(self, Incoming); + fn handle(&self, Request, Response); } -impl, S: NetworkStream> Handler for fn(Incoming) { - fn handle(self, incoming: Incoming) { - (self)(incoming) +impl Handler for fn(Request, Response) { + fn handle(&self, req: Request, res: Response) { + (*self)(req, res) } } diff --git a/src/server/request.rs b/src/server/request.rs index 9c613a03..f15df58b 100644 --- a/src/server/request.rs +++ b/src/server/request.rs @@ -2,7 +2,7 @@ //! //! These are requests that a `hyper::Server` receives, and include its method, //! target URI, headers, and message body. -use std::io::{Reader, BufferedReader, IoResult}; +use std::io::IoResult; use std::io::net::ip::SocketAddr; use {HttpResult}; @@ -13,11 +13,12 @@ use header::common::ContentLength; use http::{read_request_line}; use http::HttpReader; use http::HttpReader::{SizedReader, ChunkedReader}; -use net::NetworkStream; use uri::RequestUri; +pub type InternalReader<'a> = &'a mut Reader + 'a; + /// A request bundles several parts of an incoming `NetworkStream`, given to a `Handler`. -pub struct Request { +pub struct Request<'a> { /// The IP address of the remote connection. pub remote_addr: SocketAddr, /// The `Method`, such as `Get`, `Post`, etc. @@ -28,18 +29,15 @@ pub struct Request { pub uri: RequestUri, /// The version of HTTP for this request. pub version: HttpVersion, - body: HttpReader>> + body: HttpReader> } -impl Request { +impl<'a> Request<'a> { /// Create a new Request, reading the StartLine and Headers so they are /// immediately useful. - pub fn new(mut stream: S) -> HttpResult { - let remote_addr = try!(stream.peer_name()); - debug!("remote addr = {}", remote_addr); - let mut stream = BufferedReader::new(box stream as Box); + pub fn new(mut stream: InternalReader<'a>, addr: SocketAddr) -> HttpResult> { let (method, uri, version) = try!(read_request_line(&mut stream)); debug!("Request Line: {} {} {}", method, uri, version); let headers = try!(Headers::from_raw(&mut stream)); @@ -57,7 +55,7 @@ impl Request { }; Ok(Request { - remote_addr: remote_addr, + remote_addr: addr, method: method, uri: uri, headers: headers, @@ -67,7 +65,7 @@ impl Request { } } -impl Reader for Request { +impl<'a> Reader for Request<'a> { fn read(&mut self, buf: &mut [u8]) -> IoResult { self.body.read(buf) } diff --git a/src/server/response.rs b/src/server/response.rs index c2605b0c..f9d8992f 100644 --- a/src/server/response.rs +++ b/src/server/response.rs @@ -2,7 +2,7 @@ //! //! These are responses sent by a `hyper::Server` to clients, after //! receiving a request. -use std::io::{BufferedWriter, IoResult}; +use std::io::IoResult; use time::now_utc; @@ -11,23 +11,24 @@ use header::common; use http::{CR, LF, LINE_ENDING, HttpWriter}; use http::HttpWriter::{ThroughWriter, ChunkedWriter, SizedWriter}; use status; -use net::{NetworkStream, Fresh, Streaming}; +use net::{Fresh, Streaming}; use version; +pub type InternalWriter<'a> = &'a mut Writer + 'a; /// The outgoing half for a Tcp connection, created by a `Server` and given to a `Handler`. -pub struct Response { +pub struct Response<'a, W = Fresh> { /// The HTTP version of this response. pub version: version::HttpVersion, // Stream the Response is writing to, not accessible through UnwrittenResponse - body: HttpWriter>>, + body: HttpWriter>, // The status code for the request. status: status::StatusCode, // The outgoing headers on this response. headers: header::Headers } -impl Response { +impl<'a, W> Response<'a, W> { /// The status of this response. #[inline] pub fn status(&self) -> status::StatusCode { self.status } @@ -37,9 +38,9 @@ impl Response { /// Construct a Response from its constituent parts. pub fn construct(version: version::HttpVersion, - body: HttpWriter>>, + body: HttpWriter>, status: status::StatusCode, - headers: header::Headers) -> Response { + headers: header::Headers) -> Response<'a, Fresh> { Response { status: status, version: version, @@ -49,25 +50,25 @@ impl Response { } /// Deconstruct this Response into its constituent parts. - pub fn deconstruct(self) -> (version::HttpVersion, HttpWriter>>, + pub fn deconstruct(self) -> (version::HttpVersion, HttpWriter>, status::StatusCode, header::Headers) { (self.version, self.body, self.status, self.headers) } } -impl Response { +impl<'a> Response<'a, Fresh> { /// Creates a new Response that can be used to write to a network stream. - pub fn new(stream: S) -> Response { + pub fn new(stream: InternalWriter<'a>) -> Response<'a, Fresh> { Response { status: status::StatusCode::Ok, version: version::HttpVersion::Http11, headers: header::Headers::new(), - body: ThroughWriter(BufferedWriter::new(box stream as Box)) + body: ThroughWriter(stream) } } /// Consume this Response, writing the Headers and Status and creating a Response - pub fn start(mut self) -> IoResult> { + pub fn start(mut self) -> IoResult> { debug!("writing head: {} {}", self.version, self.status); try!(write!(&mut self.body, "{} {}{}{}", self.version, self.status, CR as char, LF as char)); @@ -133,7 +134,7 @@ impl Response { pub fn headers_mut(&mut self) -> &mut header::Headers { &mut self.headers } } -impl Response { +impl<'a> Response<'a, Streaming> { /// Flushes all writing of a response to the client. pub fn end(self) -> IoResult<()> { debug!("ending"); @@ -142,7 +143,7 @@ impl Response { } } -impl Writer for Response { +impl<'a> Writer for Response<'a, Streaming> { fn write(&mut self, msg: &[u8]) -> IoResult<()> { debug!("write {} bytes", msg.len()); self.body.write(msg)