Use trait objects and dynamic dispatch to abstract over NetworkStream
Server and client benchmarks show that this makes very little difference in performance and using dynamic dispatch here is significantly more ergonomic. This also bounds NetworkStream with Send to prevent incorrect implementations. Allows the implementation of mock streams for testing and flexibility. Fixes #5
This commit is contained in:
@@ -13,7 +13,7 @@ use hyper::header::common::ContentLength;
|
||||
use hyper::net::{HttpStream, HttpAcceptor};
|
||||
|
||||
trait ConcurrentHandler: Send + Sync {
|
||||
fn handle(&self, req: Request, res: Response<Fresh, HttpStream>);
|
||||
fn handle(&self, req: Request, res: Response<Fresh>);
|
||||
}
|
||||
|
||||
struct Concurrent<H: ConcurrentHandler> { handler: Arc<H> }
|
||||
@@ -39,7 +39,7 @@ macro_rules! try_abort(
|
||||
struct Echo;
|
||||
|
||||
impl ConcurrentHandler for Echo {
|
||||
fn handle(&self, mut req: Request, mut res: Response<Fresh, HttpStream>) {
|
||||
fn handle(&self, mut req: Request, mut res: Response<Fresh>) {
|
||||
match req.uri {
|
||||
hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) {
|
||||
(&Get, "/") | (&Get, "/echo") => {
|
||||
|
||||
15
src/net.rs
15
src/net.rs
@@ -22,7 +22,7 @@ pub trait NetworkAcceptor<S: NetworkStream>: Acceptor<S> + Clone + Send {
|
||||
}
|
||||
|
||||
/// An abstraction over streams that a Server can utilize.
|
||||
pub trait NetworkStream: Stream + Clone {
|
||||
pub trait NetworkStream: Stream + Clone + Send {
|
||||
/// Get the remote address of the underlying connection.
|
||||
fn peer_name(&mut self) -> IoResult<SocketAddr>;
|
||||
|
||||
@@ -30,6 +30,19 @@ pub trait NetworkStream: Stream + Clone {
|
||||
fn connect(host: &str, port: Port) -> IoResult<Self>;
|
||||
}
|
||||
|
||||
impl Reader for Box<NetworkStream + Send> {
|
||||
#[inline]
|
||||
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.read(buf) }
|
||||
}
|
||||
|
||||
impl Writer for Box<NetworkStream + Send> {
|
||||
#[inline]
|
||||
fn write(&mut self, msg: &[u8]) -> IoResult<()> { self.write(msg) }
|
||||
|
||||
#[inline]
|
||||
fn flush(&mut self) -> IoResult<()> { self.flush() }
|
||||
}
|
||||
|
||||
/// A `NetworkListener` for `HttpStream`s.
|
||||
pub struct HttpListener {
|
||||
inner: TcpListener
|
||||
|
||||
@@ -6,7 +6,7 @@ pub use self::request::Request;
|
||||
pub use self::response::{Response, Fresh, Streaming};
|
||||
|
||||
use net::{NetworkListener, NetworkAcceptor, NetworkStream};
|
||||
use net::{HttpListener, HttpAcceptor};
|
||||
use net::HttpListener;
|
||||
|
||||
pub mod request;
|
||||
pub mod response;
|
||||
@@ -31,7 +31,6 @@ impl Server<HttpListener> {
|
||||
}
|
||||
|
||||
impl<L: NetworkListener<S, A>, S: NetworkStream, A: NetworkAcceptor<S>> Server<L> {
|
||||
|
||||
/// Creates a server that can listen for and handle `NetworkStreams`.
|
||||
pub fn new(ip: IpAddr, port: Port) -> Server<L> {
|
||||
Server {
|
||||
@@ -40,9 +39,8 @@ impl<L: NetworkListener<S, A>, S: NetworkStream, A: NetworkAcceptor<S>> Server<L
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Binds to a socket, and starts handling connections.
|
||||
pub fn listen<H: Handler<A, S> + 'static>(self, handler: H) -> IoResult<Listening<A>> {
|
||||
pub fn listen<H: Handler<A, S>>(self, handler: H) -> IoResult<Listening<A>> {
|
||||
let mut listener: L = try!(NetworkListener::bind(self.ip.to_string().as_slice(), self.port));
|
||||
let socket = try!(listener.socket_name());
|
||||
let acceptor = try!(listener.listen());
|
||||
@@ -62,12 +60,12 @@ impl<L: NetworkListener<S, A>, S: NetworkStream, A: NetworkAcceptor<S>> Server<L
|
||||
|
||||
/// An iterator over incoming connections, represented as pairs of
|
||||
/// hyper Requests and Responses.
|
||||
pub struct Incoming<'a, A: 'a = HttpAcceptor> {
|
||||
pub struct Incoming<'a, A: 'a> {
|
||||
from: IncomingConnections<'a, A>
|
||||
}
|
||||
|
||||
impl<'a, A: NetworkAcceptor<S>, S: NetworkStream> Iterator<(Request<S>, Response<Fresh, S>)> for Incoming<'a, A> {
|
||||
fn next(&mut self) -> Option<(Request<S>, Response<Fresh, S>)> {
|
||||
impl<'a, S: NetworkStream, A: NetworkAcceptor<S>> Iterator<(Request, Response<Fresh>)> for Incoming<'a, A> {
|
||||
fn next(&mut self) -> Option<(Request, Response<Fresh>)> {
|
||||
for conn in self.from {
|
||||
match conn {
|
||||
Ok(stream) => {
|
||||
@@ -96,7 +94,7 @@ impl<'a, A: NetworkAcceptor<S>, S: NetworkStream> Iterator<(Request<S>, Response
|
||||
}
|
||||
|
||||
/// A listening server, which can later be closed.
|
||||
pub struct Listening<A = HttpAcceptor> {
|
||||
pub struct Listening<A> {
|
||||
acceptor: A,
|
||||
/// The socket address that the server is bound to.
|
||||
pub socket_addr: SocketAddr,
|
||||
|
||||
@@ -12,11 +12,11 @@ use header::Headers;
|
||||
use header::common::ContentLength;
|
||||
use rfc7230::{read_request_line};
|
||||
use rfc7230::{HttpReader, SizedReader, ChunkedReader};
|
||||
use net::{NetworkStream, HttpStream};
|
||||
use net::NetworkStream;
|
||||
use uri::RequestUri;
|
||||
|
||||
/// A request bundles several parts of an incoming `NetworkStream`, given to a `Handler`.
|
||||
pub struct Request<S = HttpStream> {
|
||||
pub struct Request {
|
||||
/// The IP address of the remote connection.
|
||||
pub remote_addr: SocketAddr,
|
||||
/// The `Method`, such as `Get`, `Post`, etc.
|
||||
@@ -27,17 +27,17 @@ pub struct Request<S = HttpStream> {
|
||||
pub uri: RequestUri,
|
||||
/// The version of HTTP for this request.
|
||||
pub version: HttpVersion,
|
||||
body: HttpReader<BufferedReader<S>>
|
||||
body: HttpReader<BufferedReader<Box<NetworkStream + Send>>>
|
||||
}
|
||||
|
||||
|
||||
impl<S: NetworkStream> Request<S> {
|
||||
impl Request {
|
||||
|
||||
/// Create a new Request, reading the StartLine and Headers so they are
|
||||
/// immediately useful.
|
||||
pub fn new(mut stream: S) -> HttpResult<Request<S>> {
|
||||
pub fn new<S: NetworkStream>(mut stream: S) -> HttpResult<Request> {
|
||||
let remote_addr = try_io!(stream.peer_name());
|
||||
let mut stream = BufferedReader::new(stream);
|
||||
let mut stream = BufferedReader::new(box stream as Box<NetworkStream + Send>);
|
||||
let (method, uri, version) = try!(read_request_line(&mut stream));
|
||||
let mut headers = try!(Headers::from_raw(&mut stream));
|
||||
|
||||
@@ -61,12 +61,12 @@ impl<S: NetworkStream> Request<S> {
|
||||
uri: uri,
|
||||
headers: headers,
|
||||
version: version,
|
||||
body: body,
|
||||
body: body
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: NetworkStream> Reader for Request<S> {
|
||||
impl Reader for Request {
|
||||
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
|
||||
self.body.read(buf)
|
||||
}
|
||||
|
||||
@@ -26,18 +26,18 @@ impl WriteStatus for Streaming {}
|
||||
impl WriteStatus for Fresh {}
|
||||
|
||||
/// The outgoing half for a Tcp connection, created by a `Server` and given to a `Handler`.
|
||||
pub struct Response<W: WriteStatus, S: NetworkStream> {
|
||||
pub struct Response<W: WriteStatus> {
|
||||
/// The HTTP version of this response.
|
||||
pub version: version::HttpVersion,
|
||||
// Stream the Response is writing to, not accessible through UnwrittenResponse
|
||||
body: BufferedWriter<S>, // TODO: use a HttpWriter from rfc7230
|
||||
body: BufferedWriter<Box<NetworkStream + Send>>, // TODO: use a HttpWriter from rfc7230
|
||||
// The status code for the request.
|
||||
status: status::StatusCode,
|
||||
// The outgoing headers on this response.
|
||||
headers: header::Headers
|
||||
}
|
||||
|
||||
impl<W: WriteStatus, S: NetworkStream> Response<W, S> {
|
||||
impl<W: WriteStatus> Response<W> {
|
||||
/// The status of this response.
|
||||
#[inline]
|
||||
pub fn status(&self) -> status::StatusCode { self.status }
|
||||
@@ -47,9 +47,9 @@ impl<W: WriteStatus, S: NetworkStream> Response<W, S> {
|
||||
|
||||
/// Construct a Response from its constituent parts.
|
||||
pub fn construct(version: version::HttpVersion,
|
||||
body: BufferedWriter<S>,
|
||||
body: BufferedWriter<Box<NetworkStream + Send>>,
|
||||
status: status::StatusCode,
|
||||
headers: header::Headers) -> Response<Fresh, S> {
|
||||
headers: header::Headers) -> Response<Fresh> {
|
||||
Response {
|
||||
status: status,
|
||||
version: version,
|
||||
@@ -59,19 +59,19 @@ impl<W: WriteStatus, S: NetworkStream> Response<W, S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: NetworkStream> Response<Fresh, S> {
|
||||
impl Response<Fresh> {
|
||||
/// Creates a new Response that can be used to write to a network stream.
|
||||
pub fn new(stream: S) -> Response<Fresh, S> {
|
||||
pub fn new<S: NetworkStream>(stream: S) -> Response<Fresh> {
|
||||
Response {
|
||||
status: status::Ok,
|
||||
version: version::Http11,
|
||||
headers: header::Headers::new(),
|
||||
body: BufferedWriter::new(stream)
|
||||
body: BufferedWriter::new(box stream as Box<NetworkStream + Send>)
|
||||
}
|
||||
}
|
||||
|
||||
/// Consume this Response<Fresh>, writing the Headers and Status and creating a Response<Streaming>
|
||||
pub fn start(mut self) -> IoResult<Response<Streaming, S>> {
|
||||
pub fn start(mut self) -> IoResult<Response<Streaming>> {
|
||||
debug!("writing head: {} {}", self.version, self.status);
|
||||
try!(write!(self.body, "{} {}{}{}", self.version, self.status, CR as char, LF as char));
|
||||
|
||||
@@ -104,12 +104,13 @@ impl<S: NetworkStream> Response<Fresh, S> {
|
||||
pub fn headers_mut(&mut self) -> &mut header::Headers { &mut self.headers }
|
||||
|
||||
/// Deconstruct this Response into its constituent parts.
|
||||
pub fn deconstruct(self) -> (version::HttpVersion, BufferedWriter<S>, status::StatusCode, header::Headers) {
|
||||
pub fn deconstruct(self) -> (version::HttpVersion, BufferedWriter<Box<NetworkStream + Send>>,
|
||||
status::StatusCode, header::Headers) {
|
||||
(self.version, self.body, self.status, self.headers)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: NetworkStream> Response<Streaming, S> {
|
||||
impl Response<Streaming> {
|
||||
/// Flushes all writing of a response to the client.
|
||||
pub fn end(mut self) -> IoResult<()> {
|
||||
debug!("ending");
|
||||
@@ -117,7 +118,7 @@ impl<S: NetworkStream> Response<Streaming, S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: NetworkStream> Writer for Response<Streaming, S> {
|
||||
impl Writer for Response<Streaming> {
|
||||
fn write(&mut self, msg: &[u8]) -> IoResult<()> {
|
||||
debug!("write {:u} bytes", msg.len());
|
||||
self.body.write(msg)
|
||||
|
||||
Reference in New Issue
Block a user