From 76a58940d813c4c6d8e42700e19b53542ba2e29b Mon Sep 17 00:00:00 2001 From: Jonathan Reem Date: Tue, 9 Sep 2014 14:28:06 -0700 Subject: [PATCH] Use trait objects and dynamic dispatch to abstract over NetworkStream Server and client benchmarks show that this makes very little difference in performance and using dynamic dispatch here is significantly more ergonomic. This also bounds NetworkStream with Send to prevent incorrect implementations. Allows the implementation of mock streams for testing and flexibility. Fixes #5 --- examples/concurrent-server.rs | 4 ++-- src/net.rs | 15 ++++++++++++++- src/server/mod.rs | 14 ++++++-------- src/server/request.rs | 16 ++++++++-------- src/server/response.rs | 25 +++++++++++++------------ 5 files changed, 43 insertions(+), 31 deletions(-) diff --git a/examples/concurrent-server.rs b/examples/concurrent-server.rs index 3bd371ad..740312fb 100644 --- a/examples/concurrent-server.rs +++ b/examples/concurrent-server.rs @@ -13,7 +13,7 @@ use hyper::header::common::ContentLength; use hyper::net::{HttpStream, HttpAcceptor}; trait ConcurrentHandler: Send + Sync { - fn handle(&self, req: Request, res: Response); + fn handle(&self, req: Request, res: Response); } struct Concurrent { handler: Arc } @@ -39,7 +39,7 @@ macro_rules! try_abort( struct Echo; impl ConcurrentHandler for Echo { - fn handle(&self, mut req: Request, mut res: Response) { + fn handle(&self, mut req: Request, mut res: Response) { match req.uri { hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) { (&Get, "/") | (&Get, "/echo") => { diff --git a/src/net.rs b/src/net.rs index 44d6e6b7..943ba0a6 100644 --- a/src/net.rs +++ b/src/net.rs @@ -22,7 +22,7 @@ pub trait NetworkAcceptor: Acceptor + Clone + Send { } /// An abstraction over streams that a Server can utilize. -pub trait NetworkStream: Stream + Clone { +pub trait NetworkStream: Stream + Clone + Send { /// Get the remote address of the underlying connection. fn peer_name(&mut self) -> IoResult; @@ -30,6 +30,19 @@ pub trait NetworkStream: Stream + Clone { fn connect(host: &str, port: Port) -> IoResult; } +impl Reader for Box { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> IoResult { self.read(buf) } +} + +impl Writer for Box { + #[inline] + fn write(&mut self, msg: &[u8]) -> IoResult<()> { self.write(msg) } + + #[inline] + fn flush(&mut self) -> IoResult<()> { self.flush() } +} + /// A `NetworkListener` for `HttpStream`s. pub struct HttpListener { inner: TcpListener diff --git a/src/server/mod.rs b/src/server/mod.rs index 62ad2554..6362df0c 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -6,7 +6,7 @@ pub use self::request::Request; pub use self::response::{Response, Fresh, Streaming}; use net::{NetworkListener, NetworkAcceptor, NetworkStream}; -use net::{HttpListener, HttpAcceptor}; +use net::HttpListener; pub mod request; pub mod response; @@ -31,7 +31,6 @@ impl Server { } impl, S: NetworkStream, A: NetworkAcceptor> Server { - /// Creates a server that can listen for and handle `NetworkStreams`. pub fn new(ip: IpAddr, port: Port) -> Server { Server { @@ -40,9 +39,8 @@ impl, S: NetworkStream, A: NetworkAcceptor> Server + 'static>(self, handler: H) -> IoResult> { + pub fn listen>(self, handler: H) -> IoResult> { let mut listener: L = try!(NetworkListener::bind(self.ip.to_string().as_slice(), self.port)); let socket = try!(listener.socket_name()); let acceptor = try!(listener.listen()); @@ -62,12 +60,12 @@ impl, S: NetworkStream, A: NetworkAcceptor> Server { +pub struct Incoming<'a, A: 'a> { from: IncomingConnections<'a, A> } -impl<'a, A: NetworkAcceptor, S: NetworkStream> Iterator<(Request, Response)> for Incoming<'a, A> { - fn next(&mut self) -> Option<(Request, Response)> { +impl<'a, S: NetworkStream, A: NetworkAcceptor> Iterator<(Request, Response)> for Incoming<'a, A> { + fn next(&mut self) -> Option<(Request, Response)> { for conn in self.from { match conn { Ok(stream) => { @@ -96,7 +94,7 @@ impl<'a, A: NetworkAcceptor, S: NetworkStream> Iterator<(Request, Response } /// A listening server, which can later be closed. -pub struct Listening { +pub struct Listening { acceptor: A, /// The socket address that the server is bound to. pub socket_addr: SocketAddr, diff --git a/src/server/request.rs b/src/server/request.rs index 2480f6b6..81570d67 100644 --- a/src/server/request.rs +++ b/src/server/request.rs @@ -12,11 +12,11 @@ use header::Headers; use header::common::ContentLength; use rfc7230::{read_request_line}; use rfc7230::{HttpReader, SizedReader, ChunkedReader}; -use net::{NetworkStream, HttpStream}; +use net::NetworkStream; use uri::RequestUri; /// A request bundles several parts of an incoming `NetworkStream`, given to a `Handler`. -pub struct Request { +pub struct Request { /// The IP address of the remote connection. pub remote_addr: SocketAddr, /// The `Method`, such as `Get`, `Post`, etc. @@ -27,17 +27,17 @@ pub struct Request { pub uri: RequestUri, /// The version of HTTP for this request. pub version: HttpVersion, - body: HttpReader> + body: HttpReader>> } -impl Request { +impl Request { /// Create a new Request, reading the StartLine and Headers so they are /// immediately useful. - pub fn new(mut stream: S) -> HttpResult> { + pub fn new(mut stream: S) -> HttpResult { let remote_addr = try_io!(stream.peer_name()); - let mut stream = BufferedReader::new(stream); + let mut stream = BufferedReader::new(box stream as Box); let (method, uri, version) = try!(read_request_line(&mut stream)); let mut headers = try!(Headers::from_raw(&mut stream)); @@ -61,12 +61,12 @@ impl Request { uri: uri, headers: headers, version: version, - body: body, + body: body }) } } -impl Reader for Request { +impl Reader for Request { fn read(&mut self, buf: &mut [u8]) -> IoResult { self.body.read(buf) } diff --git a/src/server/response.rs b/src/server/response.rs index 3ae34d67..165979fb 100644 --- a/src/server/response.rs +++ b/src/server/response.rs @@ -26,18 +26,18 @@ impl WriteStatus for Streaming {} impl WriteStatus for Fresh {} /// The outgoing half for a Tcp connection, created by a `Server` and given to a `Handler`. -pub struct Response { +pub struct Response { /// The HTTP version of this response. pub version: version::HttpVersion, // Stream the Response is writing to, not accessible through UnwrittenResponse - body: BufferedWriter, // TODO: use a HttpWriter from rfc7230 + body: BufferedWriter>, // TODO: use a HttpWriter from rfc7230 // The status code for the request. status: status::StatusCode, // The outgoing headers on this response. headers: header::Headers } -impl Response { +impl Response { /// The status of this response. #[inline] pub fn status(&self) -> status::StatusCode { self.status } @@ -47,9 +47,9 @@ impl Response { /// Construct a Response from its constituent parts. pub fn construct(version: version::HttpVersion, - body: BufferedWriter, + body: BufferedWriter>, status: status::StatusCode, - headers: header::Headers) -> Response { + headers: header::Headers) -> Response { Response { status: status, version: version, @@ -59,19 +59,19 @@ impl Response { } } -impl Response { +impl Response { /// Creates a new Response that can be used to write to a network stream. - pub fn new(stream: S) -> Response { + pub fn new(stream: S) -> Response { Response { status: status::Ok, version: version::Http11, headers: header::Headers::new(), - body: BufferedWriter::new(stream) + body: BufferedWriter::new(box stream as Box) } } /// Consume this Response, writing the Headers and Status and creating a Response - pub fn start(mut self) -> IoResult> { + pub fn start(mut self) -> IoResult> { debug!("writing head: {} {}", self.version, self.status); try!(write!(self.body, "{} {}{}{}", self.version, self.status, CR as char, LF as char)); @@ -104,12 +104,13 @@ impl Response { pub fn headers_mut(&mut self) -> &mut header::Headers { &mut self.headers } /// Deconstruct this Response into its constituent parts. - pub fn deconstruct(self) -> (version::HttpVersion, BufferedWriter, status::StatusCode, header::Headers) { + pub fn deconstruct(self) -> (version::HttpVersion, BufferedWriter>, + status::StatusCode, header::Headers) { (self.version, self.body, self.status, self.headers) } } -impl Response { +impl Response { /// Flushes all writing of a response to the client. pub fn end(mut self) -> IoResult<()> { debug!("ending"); @@ -117,7 +118,7 @@ impl Response { } } -impl Writer for Response { +impl Writer for Response { fn write(&mut self, msg: &[u8]) -> IoResult<()> { debug!("write {:u} bytes", msg.len()); self.body.write(msg)