From 0285fc2acc6c6f3bbb63ce72691e0d1ee0cdc9ff Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Sun, 7 Sep 2014 14:18:51 -0700 Subject: [PATCH] Abstract out NetworkStream This introduces a new Trait, NetworkStream, which abstracts over the functionality provided by TcpStream so that it can be easily mocked and extended in testing and hyper can be used for other connection sources. --- examples/concurrent-server.rs | 11 ++-- examples/server.rs | 52 +++++++-------- src/client/request.rs | 19 +++--- src/client/response.rs | 29 ++++---- src/lib.rs | 5 +- src/net.rs | 120 ++++++++++++++++++++++++++++++++++ src/server/mod.rs | 59 ++++++++++------- src/server/request.rs | 26 ++++---- src/server/response.rs | 30 ++++----- 9 files changed, 240 insertions(+), 111 deletions(-) create mode 100644 src/net.rs diff --git a/examples/concurrent-server.rs b/examples/concurrent-server.rs index 0fb4c38f..3bd371ad 100644 --- a/examples/concurrent-server.rs +++ b/examples/concurrent-server.rs @@ -1,4 +1,4 @@ -#![feature(macro_rules)] +#![feature(macro_rules, default_type_params)] extern crate hyper; extern crate debug; @@ -10,15 +10,16 @@ use std::sync::Arc; use hyper::{Get, Post}; use hyper::server::{Server, Handler, Incoming, Request, Response, Fresh}; use hyper::header::common::ContentLength; +use hyper::net::{HttpStream, HttpAcceptor}; trait ConcurrentHandler: Send + Sync { - fn handle(&self, req: Request, res: Response); + fn handle(&self, req: Request, res: Response); } struct Concurrent { handler: Arc } -impl Handler for Concurrent { - fn handle(self, mut incoming: Incoming) { +impl Handler for Concurrent { + fn handle(self, mut incoming: Incoming) { for (mut req, mut res) in incoming { let clone = self.handler.clone(); spawn(proc() { clone.handle(req, res) }) @@ -38,7 +39,7 @@ macro_rules! try_abort( struct Echo; impl ConcurrentHandler for Echo { - fn handle(&self, mut req: Request, mut res: Response) { + fn handle(&self, mut req: Request, mut res: Response) { match req.uri { hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) { (&Get, "/") | (&Get, "/echo") => { diff --git a/examples/server.rs b/examples/server.rs index e70db4fc..09069515 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -7,10 +7,8 @@ use std::io::util::copy; use std::io::net::ip::Ipv4Addr; use hyper::{Get, Post}; -use hyper::server::{Server, Handler, Incoming}; use hyper::header::common::ContentLength; - -struct Echo; +use hyper::server::{Server, Incoming}; macro_rules! try_continue( ($e:expr) => {{ @@ -21,41 +19,39 @@ macro_rules! try_continue( }} ) -impl Handler for Echo { - fn handle(self, mut incoming: Incoming) { - for (mut req, mut res) in incoming { - match req.uri { - hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) { - (&Get, "/") | (&Get, "/echo") => { - let out = b"Try POST /echo"; +fn echo(mut incoming: Incoming) { + for (mut req, mut res) in incoming { + match req.uri { + hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) { + (&Get, "/") | (&Get, "/echo") => { + let out = b"Try POST /echo"; - res.headers_mut().set(ContentLength(out.len())); - let mut res = try_continue!(res.start()); - try_continue!(res.write(out)); - try_continue!(res.end()); - continue; - }, - (&Post, "/echo") => (), // fall through, fighting mutable borrows - _ => { - *res.status_mut() = hyper::status::NotFound; - try_continue!(res.start().and_then(|res| res.end())); - continue; - } + res.headers_mut().set(ContentLength(out.len())); + let mut res = try_continue!(res.start()); + try_continue!(res.write(out)); + try_continue!(res.end()); + continue; }, + (&Post, "/echo") => (), // fall through, fighting mutable borrows _ => { + *res.status_mut() = hyper::status::NotFound; try_continue!(res.start().and_then(|res| res.end())); continue; } - }; + }, + _ => { + try_continue!(res.start().and_then(|res| res.end())); + continue; + } + }; - let mut res = try_continue!(res.start()); - try_continue!(copy(&mut req, &mut res)); - try_continue!(res.end()); - } + let mut res = try_continue!(res.start()); + try_continue!(copy(&mut req, &mut res)); + try_continue!(res.end()); } } fn main() { let server = Server::http(Ipv4Addr(127, 0, 0, 1), 1337); - server.listen(Echo).unwrap(); + server.listen(echo).unwrap(); } diff --git a/src/client/request.rs b/src/client/request.rs index c422a942..cc5e53f2 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -1,5 +1,4 @@ //! Client Requests -use std::io::net::tcp::TcpStream; use std::io::{BufferedWriter, IoResult}; use url::Url; @@ -7,6 +6,7 @@ use url::Url; use method; use header::Headers; use header::common::Host; +use net::{NetworkStream, HttpStream}; use rfc7230::LINE_ENDING; use version; use {HttpResult, HttpUriError}; @@ -14,7 +14,7 @@ use super::{Response}; /// A client request to a remote server. -pub struct Request { +pub struct Request { /// The method of this request. pub method: method::Method, /// The headers that will be sent with this request. @@ -24,13 +24,13 @@ pub struct Request { /// The HTTP version of this request. pub version: version::HttpVersion, headers_written: bool, - body: BufferedWriter, + body: BufferedWriter, } -impl Request { +impl Request { /// Create a new client request. - pub fn new(method: method::Method, url: Url) -> HttpResult { + pub fn new(method: method::Method, url: Url) -> HttpResult> { debug!("{} {}", method, url); let host = match url.serialize_host() { Some(host) => host, @@ -43,7 +43,7 @@ impl Request { }; debug!("port={}", port); - let stream = try_io!(TcpStream::connect(host.as_slice(), port)); + let stream = try_io!(NetworkStream::connect(host.as_slice(), port)); let stream = BufferedWriter::new(stream); let mut headers = Headers::new(); headers.set(Host(host)); @@ -82,16 +82,15 @@ impl Request { /// Completes writing the request, and returns a response to read from. /// /// Consumes the Request. - pub fn send(mut self) -> HttpResult { + pub fn send(mut self) -> HttpResult> { try_io!(self.flush()); - let mut raw = self.body.unwrap(); - try_io!(raw.close_write()); + let raw = self.body.unwrap(); Response::new(raw) } } -impl Writer for Request { +impl Writer for Request { fn write(&mut self, msg: &[u8]) -> IoResult<()> { if !self.headers_written { try!(self.write_head()); diff --git a/src/client/response.rs b/src/client/response.rs index bf39407b..2e6f5eed 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -1,33 +1,33 @@ //! Client Responses use std::io::{BufferedReader, IoResult}; -use std::io::net::tcp::TcpStream; use header; use header::common::{ContentLength, TransferEncoding}; use header::common::transfer_encoding::Chunked; +use net::{NetworkStream, HttpStream}; use rfc7230::{read_status_line, HttpReader, SizedReader, ChunkedReader, EofReader}; use status; use version; use {HttpResult}; /// A response for a client request to a remote server. -pub struct Response { +pub struct Response { /// The status from the server. pub status: status::StatusCode, /// The headers from the server. pub headers: header::Headers, /// The HTTP version of this response from the server. pub version: version::HttpVersion, - body: HttpReader>, + body: HttpReader>, } -impl Response { +impl Response { /// Creates a new response from a server. - pub fn new(tcp: TcpStream) -> HttpResult { - let mut tcp = BufferedReader::new(tcp); - let (version, status) = try!(read_status_line(&mut tcp)); - let mut headers = try!(header::Headers::from_raw(&mut tcp)); + pub fn new(stream: S) -> HttpResult> { + let mut stream = BufferedReader::new(stream); + let (version, status) = try!(read_status_line(&mut stream)); + let mut headers = try!(header::Headers::from_raw(&mut stream)); debug!("{} {}", version, status); debug!("{}", headers); @@ -40,22 +40,22 @@ impl Response { }; if codings.contains(&Chunked) { - ChunkedReader(tcp, None) + ChunkedReader(stream, None) } else { - debug!("not chucked. read till eof"); - EofReader(tcp) + debug!("not chuncked. read till eof"); + EofReader(stream) } } None => unreachable!() } } else if headers.has::() { match headers.get_ref::() { - Some(&ContentLength(len)) => SizedReader(tcp, len), + Some(&ContentLength(len)) => SizedReader(stream, len), None => unreachable!() } } else { debug!("neither Transfer-Encoding nor Content-Length"); - EofReader(tcp) + EofReader(stream) }; Ok(Response { @@ -67,7 +67,8 @@ impl Response { } } -impl Reader for Response { +impl Reader for Response { + #[inline] fn read(&mut self, buf: &mut [u8]) -> IoResult { self.body.read(buf) } diff --git a/src/lib.rs b/src/lib.rs index 5beb9b8c..44a052c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ //! # hyper -#![feature(macro_rules, phase)] +#![feature(macro_rules, phase, default_type_params)] #![warn(missing_doc)] -#![deny(warnings)] +//#![deny(warnings)] #![experimental] extern crate time; @@ -53,6 +53,7 @@ macro_rules! trace( pub mod client; pub mod method; pub mod header; +pub mod net; pub mod server; pub mod status; pub mod uri; diff --git a/src/net.rs b/src/net.rs new file mode 100644 index 00000000..44d6e6b7 --- /dev/null +++ b/src/net.rs @@ -0,0 +1,120 @@ +//! A collection of traits abstracting over Listeners and Streams. +use std::io::{IoResult, Stream, Listener, Acceptor}; +use std::io::net::ip::{SocketAddr, Port}; +use std::io::net::tcp::{TcpStream, TcpListener, TcpAcceptor}; + +/// An abstraction to listen for connections on a certain port. +pub trait NetworkListener>: Listener { + /// Bind to a socket. + /// + /// Note: This does not start listening for connections. You must call + /// `listen()` to do that. + fn bind(host: &str, port: Port) -> IoResult; + + /// Get the address this Listener ended up listening on. + fn socket_name(&mut self) -> IoResult; +} + +/// An abstraction to receive `HttpStream`s. +pub trait NetworkAcceptor: Acceptor + Clone + Send { + /// Closes the Acceptor, so no more incoming connections will be handled. + fn close(&mut self) -> IoResult<()>; +} + +/// An abstraction over streams that a Server can utilize. +pub trait NetworkStream: Stream + Clone { + /// Get the remote address of the underlying connection. + fn peer_name(&mut self) -> IoResult; + + /// Connect to a remote address. + fn connect(host: &str, port: Port) -> IoResult; +} + +/// A `NetworkListener` for `HttpStream`s. +pub struct HttpListener { + inner: TcpListener +} + +impl Listener for HttpListener { + #[inline] + fn listen(self) -> IoResult { + Ok(HttpAcceptor { + inner: try!(self.inner.listen()) + }) + } +} + +impl NetworkListener for HttpListener { + #[inline] + fn bind(host: &str, port: Port) -> IoResult { + Ok(HttpListener { + inner: try!(TcpListener::bind(host, port)) + }) + } + + #[inline] + fn socket_name(&mut self) -> IoResult { + self.inner.socket_name() + } +} + +/// A `NetworkAcceptor` for `HttpStream`s. +#[deriving(Clone)] +pub struct HttpAcceptor { + inner: TcpAcceptor +} + +impl Acceptor for HttpAcceptor { + #[inline] + fn accept(&mut self) -> IoResult { + Ok(HttpStream { + inner: try!(self.inner.accept()) + }) + } +} + +impl NetworkAcceptor for HttpAcceptor { + #[inline] + fn close(&mut self) -> IoResult<()> { + self.inner.close_accept() + } +} + +/// A wrapper around a TcpStream. +#[deriving(Clone)] +pub struct HttpStream { + inner: TcpStream +} + +impl Reader for HttpStream { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> IoResult { + self.inner.read(buf) + } +} + +impl Writer for HttpStream { + #[inline] + fn write(&mut self, msg: &[u8]) -> IoResult<()> { + self.inner.write(msg) + } + #[inline] + fn flush(&mut self) -> IoResult<()> { + self.inner.flush() + } +} + + +impl NetworkStream for HttpStream { + #[inline] + fn peer_name(&mut self) -> IoResult { + self.inner.peer_name() + } + + #[inline] + fn connect(host: &str, port: Port) -> IoResult { + Ok(HttpStream { + inner: try!(TcpStream::connect(host, port)) + }) + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index e33b3902..62ad2554 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,11 +1,13 @@ //! HTTP Server -use std::io::net::tcp::{TcpListener, TcpAcceptor}; use std::io::{Acceptor, Listener, IoResult, EndOfFile, IncomingConnections}; use std::io::net::ip::{IpAddr, Port, SocketAddr}; pub use self::request::Request; pub use self::response::{Response, Fresh, Streaming}; +use net::{NetworkListener, NetworkAcceptor, NetworkStream}; +use net::{HttpListener, HttpAcceptor}; + pub mod request; pub mod response; @@ -13,32 +15,41 @@ pub mod response; /// /// Once listening, it will create a `Request`/`Response` pair for each /// incoming connection, and hand them to the provided handler. -pub struct Server { +pub struct Server { ip: IpAddr, port: Port } - -impl Server { - - /// Creates a server to be used for `http` conenctions. +impl Server { + /// Creates a new server that will handle `HttpStream`s. pub fn http(ip: IpAddr, port: Port) -> Server { Server { ip: ip, port: port } } +} + +impl, S: NetworkStream, A: NetworkAcceptor> Server { + + /// Creates a server that can listen for and handle `NetworkStreams`. + pub fn new(ip: IpAddr, port: Port) -> Server { + Server { + ip: ip, + port: port + } + } + /// Binds to a socket, and starts handling connections. - pub fn listen(self, handler: H) -> IoResult { - let mut listener = try!(TcpListener::bind(self.ip.to_string().as_slice(), self.port)); + pub fn listen + 'static>(self, handler: H) -> IoResult> { + let mut listener: L = try!(NetworkListener::bind(self.ip.to_string().as_slice(), self.port)); let socket = try!(listener.socket_name()); let acceptor = try!(listener.listen()); - let worker = acceptor.clone(); + let mut worker = acceptor.clone(); spawn(proc() { - let mut acceptor = worker; - handler.handle(Incoming { from: acceptor.incoming() }); + handler.handle(Incoming { from: worker.incoming() }); }); Ok(Listening { @@ -51,12 +62,12 @@ impl Server { /// An iterator over incoming connections, represented as pairs of /// hyper Requests and Responses. -pub struct Incoming<'a> { - from: IncomingConnections<'a, TcpAcceptor> +pub struct Incoming<'a, A: 'a = HttpAcceptor> { + from: IncomingConnections<'a, A> } -impl<'a> Iterator<(Request, Response)> for Incoming<'a> { - fn next(&mut self) -> Option<(Request, Response)> { +impl<'a, A: NetworkAcceptor, S: NetworkStream> Iterator<(Request, Response)> for Incoming<'a, A> { + fn next(&mut self) -> Option<(Request, Response)> { for conn in self.from { match conn { Ok(stream) => { @@ -85,30 +96,30 @@ impl<'a> Iterator<(Request, Response)> for Incoming<'a> { } /// A listening server, which can later be closed. -pub struct Listening { - acceptor: TcpAcceptor, +pub struct Listening { + acceptor: A, /// The socket address that the server is bound to. pub socket_addr: SocketAddr, } -impl Listening { - /// Stop the server from listening to its socket address. +impl, S: NetworkStream> Listening { + /// Stop the server from listening to it's socket address. pub fn close(mut self) -> IoResult<()> { debug!("closing server"); - self.acceptor.close_accept() + self.acceptor.close() } } /// A handler that can handle incoming requests for a server. -pub trait Handler: Send { +pub trait Handler, S: NetworkStream>: Send { /// Receives a `Request`/`Response` pair, and should perform some action on them. /// /// This could reading from the request, and writing to the response. - fn handle(self, Incoming); + fn handle(self, Incoming); } -impl Handler for fn(Incoming) { - fn handle(self, incoming: Incoming) { +impl, S: NetworkStream> Handler for fn(Incoming) { + fn handle(self, incoming: Incoming) { (self)(incoming) } } diff --git a/src/server/request.rs b/src/server/request.rs index 71805280..2480f6b6 100644 --- a/src/server/request.rs +++ b/src/server/request.rs @@ -4,7 +4,6 @@ //! target URI, headers, and message body. use std::io::{Reader, BufferedReader, IoResult}; use std::io::net::ip::SocketAddr; -use std::io::net::tcp::TcpStream; use {HttpResult}; use version::{HttpVersion}; @@ -13,10 +12,11 @@ use header::Headers; use header::common::ContentLength; use rfc7230::{read_request_line}; use rfc7230::{HttpReader, SizedReader, ChunkedReader}; +use net::{NetworkStream, HttpStream}; use uri::RequestUri; -/// A request bundles several parts of an incoming TCP stream, given to a `Handler`. -pub struct Request { +/// A request bundles several parts of an incoming `NetworkStream`, given to a `Handler`. +pub struct Request { /// The IP address of the remote connection. pub remote_addr: SocketAddr, /// The `Method`, such as `Get`, `Post`, etc. @@ -27,19 +27,19 @@ pub struct Request { pub uri: RequestUri, /// The version of HTTP for this request. pub version: HttpVersion, - body: HttpReader> + body: HttpReader> } -impl Request { +impl Request { /// Create a new Request, reading the StartLine and Headers so they are /// immediately useful. - pub fn new(mut tcp: TcpStream) -> HttpResult { - let remote_addr = try_io!(tcp.peer_name()); - let mut tcp = BufferedReader::new(tcp); - let (method, uri, version) = try!(read_request_line(&mut tcp)); - let mut headers = try!(Headers::from_raw(&mut tcp)); + pub fn new(mut stream: S) -> HttpResult> { + let remote_addr = try_io!(stream.peer_name()); + let mut stream = BufferedReader::new(stream); + let (method, uri, version) = try!(read_request_line(&mut stream)); + let mut headers = try!(Headers::from_raw(&mut stream)); debug!("{} {} {}", method, uri, version); debug!("{}", headers); @@ -47,12 +47,12 @@ impl Request { let body = if headers.has::() { match headers.get_ref::() { - Some(&ContentLength(len)) => SizedReader(tcp, len), + Some(&ContentLength(len)) => SizedReader(stream, len), None => unreachable!() } } else { todo!("check for Transfer-Encoding: chunked"); - ChunkedReader(tcp, None) + ChunkedReader(stream, None) }; Ok(Request { @@ -66,7 +66,7 @@ impl Request { } } -impl Reader for Request { +impl Reader for Request { fn read(&mut self, buf: &mut [u8]) -> IoResult { self.body.read(buf) } diff --git a/src/server/response.rs b/src/server/response.rs index ab3dc81e..3ae34d67 100644 --- a/src/server/response.rs +++ b/src/server/response.rs @@ -3,15 +3,15 @@ //! These are responses sent by a `hyper::Server` to clients, after //! receiving a request. use std::io::{BufferedWriter, IoResult}; -use std::io::net::tcp::TcpStream; use time::now_utc; use header; use header::common; -use status; -use version; use rfc7230::{CR, LF, LINE_ENDING}; +use status; +use net::NetworkStream; +use version; /// Phantom type indicating Headers and StatusCode have not been written. pub struct Fresh; @@ -26,18 +26,18 @@ impl WriteStatus for Streaming {} impl WriteStatus for Fresh {} /// The outgoing half for a Tcp connection, created by a `Server` and given to a `Handler`. -pub struct Response { +pub struct Response { /// The HTTP version of this response. pub version: version::HttpVersion, // Stream the Response is writing to, not accessible through UnwrittenResponse - body: BufferedWriter, // TODO: use a HttpWriter from rfc7230 + body: BufferedWriter, // TODO: use a HttpWriter from rfc7230 // The status code for the request. status: status::StatusCode, // The outgoing headers on this response. headers: header::Headers } -impl Response { +impl Response { /// The status of this response. #[inline] pub fn status(&self) -> status::StatusCode { self.status } @@ -47,9 +47,9 @@ impl Response { /// Construct a Response from its constituent parts. pub fn construct(version: version::HttpVersion, - body: BufferedWriter, + body: BufferedWriter, status: status::StatusCode, - headers: header::Headers) -> Response { + headers: header::Headers) -> Response { Response { status: status, version: version, @@ -59,19 +59,19 @@ impl Response { } } -impl Response { +impl Response { /// Creates a new Response that can be used to write to a network stream. - pub fn new(tcp: TcpStream) -> Response { + pub fn new(stream: S) -> Response { Response { status: status::Ok, version: version::Http11, headers: header::Headers::new(), - body: BufferedWriter::new(tcp) + body: BufferedWriter::new(stream) } } /// Consume this Response, writing the Headers and Status and creating a Response - pub fn start(mut self) -> IoResult> { + pub fn start(mut self) -> IoResult> { debug!("writing head: {} {}", self.version, self.status); try!(write!(self.body, "{} {}{}{}", self.version, self.status, CR as char, LF as char)); @@ -104,12 +104,12 @@ impl Response { pub fn headers_mut(&mut self) -> &mut header::Headers { &mut self.headers } /// Deconstruct this Response into its constituent parts. - pub fn deconstruct(self) -> (version::HttpVersion, BufferedWriter, status::StatusCode, header::Headers) { + pub fn deconstruct(self) -> (version::HttpVersion, BufferedWriter, status::StatusCode, header::Headers) { (self.version, self.body, self.status, self.headers) } } -impl Response { +impl Response { /// Flushes all writing of a response to the client. pub fn end(mut self) -> IoResult<()> { debug!("ending"); @@ -117,7 +117,7 @@ impl Response { } } -impl Writer for Response { +impl Writer for Response { fn write(&mut self, msg: &[u8]) -> IoResult<()> { debug!("write {:u} bytes", msg.len()); self.body.write(msg)