Abstract out NetworkStream

This introduces a new Trait, NetworkStream, which abstracts over
the functionality provided by TcpStream so that it can be easily
mocked and extended in testing and hyper can be used for
other connection sources.
This commit is contained in:
Sean McArthur
2014-09-07 14:18:51 -07:00
committed by Jonathan Reem
parent a8d7b681da
commit 0285fc2acc
9 changed files with 240 additions and 111 deletions

View File

@@ -1,4 +1,4 @@
#![feature(macro_rules)]
#![feature(macro_rules, default_type_params)]
extern crate hyper;
extern crate debug;
@@ -10,15 +10,16 @@ use std::sync::Arc;
use hyper::{Get, Post};
use hyper::server::{Server, Handler, Incoming, Request, Response, Fresh};
use hyper::header::common::ContentLength;
use hyper::net::{HttpStream, HttpAcceptor};
trait ConcurrentHandler: Send + Sync {
fn handle(&self, req: Request, res: Response<Fresh>);
fn handle(&self, req: Request, res: Response<Fresh, HttpStream>);
}
struct Concurrent<H: ConcurrentHandler> { handler: Arc<H> }
impl<H: ConcurrentHandler> Handler for Concurrent<H> {
fn handle(self, mut incoming: Incoming) {
impl<H: ConcurrentHandler> Handler<HttpAcceptor, HttpStream> for Concurrent<H> {
fn handle(self, mut incoming: Incoming<HttpAcceptor>) {
for (mut req, mut res) in incoming {
let clone = self.handler.clone();
spawn(proc() { clone.handle(req, res) })
@@ -38,7 +39,7 @@ macro_rules! try_abort(
struct Echo;
impl ConcurrentHandler for Echo {
fn handle(&self, mut req: Request, mut res: Response<Fresh>) {
fn handle(&self, mut req: Request, mut res: Response<Fresh, HttpStream>) {
match req.uri {
hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) {
(&Get, "/") | (&Get, "/echo") => {

View File

@@ -7,10 +7,8 @@ use std::io::util::copy;
use std::io::net::ip::Ipv4Addr;
use hyper::{Get, Post};
use hyper::server::{Server, Handler, Incoming};
use hyper::header::common::ContentLength;
struct Echo;
use hyper::server::{Server, Incoming};
macro_rules! try_continue(
($e:expr) => {{
@@ -21,41 +19,39 @@ macro_rules! try_continue(
}}
)
impl Handler for Echo {
fn handle(self, mut incoming: Incoming) {
for (mut req, mut res) in incoming {
match req.uri {
hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) {
(&Get, "/") | (&Get, "/echo") => {
let out = b"Try POST /echo";
fn echo(mut incoming: Incoming) {
for (mut req, mut res) in incoming {
match req.uri {
hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) {
(&Get, "/") | (&Get, "/echo") => {
let out = b"Try POST /echo";
res.headers_mut().set(ContentLength(out.len()));
let mut res = try_continue!(res.start());
try_continue!(res.write(out));
try_continue!(res.end());
continue;
},
(&Post, "/echo") => (), // fall through, fighting mutable borrows
_ => {
*res.status_mut() = hyper::status::NotFound;
try_continue!(res.start().and_then(|res| res.end()));
continue;
}
res.headers_mut().set(ContentLength(out.len()));
let mut res = try_continue!(res.start());
try_continue!(res.write(out));
try_continue!(res.end());
continue;
},
(&Post, "/echo") => (), // fall through, fighting mutable borrows
_ => {
*res.status_mut() = hyper::status::NotFound;
try_continue!(res.start().and_then(|res| res.end()));
continue;
}
};
},
_ => {
try_continue!(res.start().and_then(|res| res.end()));
continue;
}
};
let mut res = try_continue!(res.start());
try_continue!(copy(&mut req, &mut res));
try_continue!(res.end());
}
let mut res = try_continue!(res.start());
try_continue!(copy(&mut req, &mut res));
try_continue!(res.end());
}
}
fn main() {
let server = Server::http(Ipv4Addr(127, 0, 0, 1), 1337);
server.listen(Echo).unwrap();
server.listen(echo).unwrap();
}

View File

@@ -1,5 +1,4 @@
//! Client Requests
use std::io::net::tcp::TcpStream;
use std::io::{BufferedWriter, IoResult};
use url::Url;
@@ -7,6 +6,7 @@ use url::Url;
use method;
use header::Headers;
use header::common::Host;
use net::{NetworkStream, HttpStream};
use rfc7230::LINE_ENDING;
use version;
use {HttpResult, HttpUriError};
@@ -14,7 +14,7 @@ use super::{Response};
/// A client request to a remote server.
pub struct Request {
pub struct Request<S = HttpStream> {
/// The method of this request.
pub method: method::Method,
/// The headers that will be sent with this request.
@@ -24,13 +24,13 @@ pub struct Request {
/// The HTTP version of this request.
pub version: version::HttpVersion,
headers_written: bool,
body: BufferedWriter<TcpStream>,
body: BufferedWriter<S>,
}
impl Request {
impl<S: NetworkStream> Request<S> {
/// Create a new client request.
pub fn new(method: method::Method, url: Url) -> HttpResult<Request> {
pub fn new(method: method::Method, url: Url) -> HttpResult<Request<S>> {
debug!("{} {}", method, url);
let host = match url.serialize_host() {
Some(host) => host,
@@ -43,7 +43,7 @@ impl Request {
};
debug!("port={}", port);
let stream = try_io!(TcpStream::connect(host.as_slice(), port));
let stream = try_io!(NetworkStream::connect(host.as_slice(), port));
let stream = BufferedWriter::new(stream);
let mut headers = Headers::new();
headers.set(Host(host));
@@ -82,16 +82,15 @@ impl Request {
/// Completes writing the request, and returns a response to read from.
///
/// Consumes the Request.
pub fn send(mut self) -> HttpResult<Response> {
pub fn send(mut self) -> HttpResult<Response<S>> {
try_io!(self.flush());
let mut raw = self.body.unwrap();
try_io!(raw.close_write());
let raw = self.body.unwrap();
Response::new(raw)
}
}
impl Writer for Request {
impl<S: NetworkStream> Writer for Request<S> {
fn write(&mut self, msg: &[u8]) -> IoResult<()> {
if !self.headers_written {
try!(self.write_head());

View File

@@ -1,33 +1,33 @@
//! Client Responses
use std::io::{BufferedReader, IoResult};
use std::io::net::tcp::TcpStream;
use header;
use header::common::{ContentLength, TransferEncoding};
use header::common::transfer_encoding::Chunked;
use net::{NetworkStream, HttpStream};
use rfc7230::{read_status_line, HttpReader, SizedReader, ChunkedReader, EofReader};
use status;
use version;
use {HttpResult};
/// A response for a client request to a remote server.
pub struct Response {
pub struct Response<S = HttpStream> {
/// The status from the server.
pub status: status::StatusCode,
/// The headers from the server.
pub headers: header::Headers,
/// The HTTP version of this response from the server.
pub version: version::HttpVersion,
body: HttpReader<BufferedReader<TcpStream>>,
body: HttpReader<BufferedReader<S>>,
}
impl Response {
impl<S: NetworkStream> Response<S> {
/// Creates a new response from a server.
pub fn new(tcp: TcpStream) -> HttpResult<Response> {
let mut tcp = BufferedReader::new(tcp);
let (version, status) = try!(read_status_line(&mut tcp));
let mut headers = try!(header::Headers::from_raw(&mut tcp));
pub fn new(stream: S) -> HttpResult<Response<S>> {
let mut stream = BufferedReader::new(stream);
let (version, status) = try!(read_status_line(&mut stream));
let mut headers = try!(header::Headers::from_raw(&mut stream));
debug!("{} {}", version, status);
debug!("{}", headers);
@@ -40,22 +40,22 @@ impl Response {
};
if codings.contains(&Chunked) {
ChunkedReader(tcp, None)
ChunkedReader(stream, None)
} else {
debug!("not chucked. read till eof");
EofReader(tcp)
debug!("not chuncked. read till eof");
EofReader(stream)
}
}
None => unreachable!()
}
} else if headers.has::<ContentLength>() {
match headers.get_ref::<ContentLength>() {
Some(&ContentLength(len)) => SizedReader(tcp, len),
Some(&ContentLength(len)) => SizedReader(stream, len),
None => unreachable!()
}
} else {
debug!("neither Transfer-Encoding nor Content-Length");
EofReader(tcp)
EofReader(stream)
};
Ok(Response {
@@ -67,7 +67,8 @@ impl Response {
}
}
impl Reader for Response {
impl<S: NetworkStream> Reader for Response<S> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.body.read(buf)
}

View File

@@ -1,7 +1,7 @@
//! # hyper
#![feature(macro_rules, phase)]
#![feature(macro_rules, phase, default_type_params)]
#![warn(missing_doc)]
#![deny(warnings)]
//#![deny(warnings)]
#![experimental]
extern crate time;
@@ -53,6 +53,7 @@ macro_rules! trace(
pub mod client;
pub mod method;
pub mod header;
pub mod net;
pub mod server;
pub mod status;
pub mod uri;

120
src/net.rs Normal file
View File

@@ -0,0 +1,120 @@
//! A collection of traits abstracting over Listeners and Streams.
use std::io::{IoResult, Stream, Listener, Acceptor};
use std::io::net::ip::{SocketAddr, Port};
use std::io::net::tcp::{TcpStream, TcpListener, TcpAcceptor};
/// An abstraction to listen for connections on a certain port.
pub trait NetworkListener<S: NetworkStream, A: NetworkAcceptor<S>>: Listener<S, A> {
/// Bind to a socket.
///
/// Note: This does not start listening for connections. You must call
/// `listen()` to do that.
fn bind(host: &str, port: Port) -> IoResult<Self>;
/// Get the address this Listener ended up listening on.
fn socket_name(&mut self) -> IoResult<SocketAddr>;
}
/// An abstraction to receive `HttpStream`s.
pub trait NetworkAcceptor<S: NetworkStream>: Acceptor<S> + Clone + Send {
/// Closes the Acceptor, so no more incoming connections will be handled.
fn close(&mut self) -> IoResult<()>;
}
/// An abstraction over streams that a Server can utilize.
pub trait NetworkStream: Stream + Clone {
/// Get the remote address of the underlying connection.
fn peer_name(&mut self) -> IoResult<SocketAddr>;
/// Connect to a remote address.
fn connect(host: &str, port: Port) -> IoResult<Self>;
}
/// A `NetworkListener` for `HttpStream`s.
pub struct HttpListener {
inner: TcpListener
}
impl Listener<HttpStream, HttpAcceptor> for HttpListener {
#[inline]
fn listen(self) -> IoResult<HttpAcceptor> {
Ok(HttpAcceptor {
inner: try!(self.inner.listen())
})
}
}
impl NetworkListener<HttpStream, HttpAcceptor> for HttpListener {
#[inline]
fn bind(host: &str, port: Port) -> IoResult<HttpListener> {
Ok(HttpListener {
inner: try!(TcpListener::bind(host, port))
})
}
#[inline]
fn socket_name(&mut self) -> IoResult<SocketAddr> {
self.inner.socket_name()
}
}
/// A `NetworkAcceptor` for `HttpStream`s.
#[deriving(Clone)]
pub struct HttpAcceptor {
inner: TcpAcceptor
}
impl Acceptor<HttpStream> for HttpAcceptor {
#[inline]
fn accept(&mut self) -> IoResult<HttpStream> {
Ok(HttpStream {
inner: try!(self.inner.accept())
})
}
}
impl NetworkAcceptor<HttpStream> for HttpAcceptor {
#[inline]
fn close(&mut self) -> IoResult<()> {
self.inner.close_accept()
}
}
/// A wrapper around a TcpStream.
#[deriving(Clone)]
pub struct HttpStream {
inner: TcpStream
}
impl Reader for HttpStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.inner.read(buf)
}
}
impl Writer for HttpStream {
#[inline]
fn write(&mut self, msg: &[u8]) -> IoResult<()> {
self.inner.write(msg)
}
#[inline]
fn flush(&mut self) -> IoResult<()> {
self.inner.flush()
}
}
impl NetworkStream for HttpStream {
#[inline]
fn peer_name(&mut self) -> IoResult<SocketAddr> {
self.inner.peer_name()
}
#[inline]
fn connect(host: &str, port: Port) -> IoResult<HttpStream> {
Ok(HttpStream {
inner: try!(TcpStream::connect(host, port))
})
}
}

View File

@@ -1,11 +1,13 @@
//! HTTP Server
use std::io::net::tcp::{TcpListener, TcpAcceptor};
use std::io::{Acceptor, Listener, IoResult, EndOfFile, IncomingConnections};
use std::io::net::ip::{IpAddr, Port, SocketAddr};
pub use self::request::Request;
pub use self::response::{Response, Fresh, Streaming};
use net::{NetworkListener, NetworkAcceptor, NetworkStream};
use net::{HttpListener, HttpAcceptor};
pub mod request;
pub mod response;
@@ -13,32 +15,41 @@ pub mod response;
///
/// Once listening, it will create a `Request`/`Response` pair for each
/// incoming connection, and hand them to the provided handler.
pub struct Server {
pub struct Server<L = HttpListener> {
ip: IpAddr,
port: Port
}
impl Server {
/// Creates a server to be used for `http` conenctions.
impl Server<HttpListener> {
/// Creates a new server that will handle `HttpStream`s.
pub fn http(ip: IpAddr, port: Port) -> Server {
Server {
ip: ip,
port: port
}
}
}
impl<L: NetworkListener<S, A>, S: NetworkStream, A: NetworkAcceptor<S>> Server<L> {
/// Creates a server that can listen for and handle `NetworkStreams`.
pub fn new(ip: IpAddr, port: Port) -> Server<L> {
Server {
ip: ip,
port: port
}
}
/// Binds to a socket, and starts handling connections.
pub fn listen<H: Handler + 'static>(self, handler: H) -> IoResult<Listening> {
let mut listener = try!(TcpListener::bind(self.ip.to_string().as_slice(), self.port));
pub fn listen<H: Handler<A, S> + 'static>(self, handler: H) -> IoResult<Listening<A>> {
let mut listener: L = try!(NetworkListener::bind(self.ip.to_string().as_slice(), self.port));
let socket = try!(listener.socket_name());
let acceptor = try!(listener.listen());
let worker = acceptor.clone();
let mut worker = acceptor.clone();
spawn(proc() {
let mut acceptor = worker;
handler.handle(Incoming { from: acceptor.incoming() });
handler.handle(Incoming { from: worker.incoming() });
});
Ok(Listening {
@@ -51,12 +62,12 @@ impl Server {
/// An iterator over incoming connections, represented as pairs of
/// hyper Requests and Responses.
pub struct Incoming<'a> {
from: IncomingConnections<'a, TcpAcceptor>
pub struct Incoming<'a, A: 'a = HttpAcceptor> {
from: IncomingConnections<'a, A>
}
impl<'a> Iterator<(Request, Response<Fresh>)> for Incoming<'a> {
fn next(&mut self) -> Option<(Request, Response<Fresh>)> {
impl<'a, A: NetworkAcceptor<S>, S: NetworkStream> Iterator<(Request<S>, Response<Fresh, S>)> for Incoming<'a, A> {
fn next(&mut self) -> Option<(Request<S>, Response<Fresh, S>)> {
for conn in self.from {
match conn {
Ok(stream) => {
@@ -85,30 +96,30 @@ impl<'a> Iterator<(Request, Response<Fresh>)> for Incoming<'a> {
}
/// A listening server, which can later be closed.
pub struct Listening {
acceptor: TcpAcceptor,
pub struct Listening<A = HttpAcceptor> {
acceptor: A,
/// The socket address that the server is bound to.
pub socket_addr: SocketAddr,
}
impl Listening {
/// Stop the server from listening to its socket address.
impl<A: NetworkAcceptor<S>, S: NetworkStream> Listening<A> {
/// Stop the server from listening to it's socket address.
pub fn close(mut self) -> IoResult<()> {
debug!("closing server");
self.acceptor.close_accept()
self.acceptor.close()
}
}
/// A handler that can handle incoming requests for a server.
pub trait Handler: Send {
pub trait Handler<A: NetworkAcceptor<S>, S: NetworkStream>: Send {
/// Receives a `Request`/`Response` pair, and should perform some action on them.
///
/// This could reading from the request, and writing to the response.
fn handle(self, Incoming);
fn handle(self, Incoming<A>);
}
impl Handler for fn(Incoming) {
fn handle(self, incoming: Incoming) {
impl<A: NetworkAcceptor<S>, S: NetworkStream> Handler<A, S> for fn(Incoming<A>) {
fn handle(self, incoming: Incoming<A>) {
(self)(incoming)
}
}

View File

@@ -4,7 +4,6 @@
//! target URI, headers, and message body.
use std::io::{Reader, BufferedReader, IoResult};
use std::io::net::ip::SocketAddr;
use std::io::net::tcp::TcpStream;
use {HttpResult};
use version::{HttpVersion};
@@ -13,10 +12,11 @@ use header::Headers;
use header::common::ContentLength;
use rfc7230::{read_request_line};
use rfc7230::{HttpReader, SizedReader, ChunkedReader};
use net::{NetworkStream, HttpStream};
use uri::RequestUri;
/// A request bundles several parts of an incoming TCP stream, given to a `Handler`.
pub struct Request {
/// A request bundles several parts of an incoming `NetworkStream`, given to a `Handler`.
pub struct Request<S = HttpStream> {
/// The IP address of the remote connection.
pub remote_addr: SocketAddr,
/// The `Method`, such as `Get`, `Post`, etc.
@@ -27,19 +27,19 @@ pub struct Request {
pub uri: RequestUri,
/// The version of HTTP for this request.
pub version: HttpVersion,
body: HttpReader<BufferedReader<TcpStream>>
body: HttpReader<BufferedReader<S>>
}
impl Request {
impl<S: NetworkStream> Request<S> {
/// Create a new Request, reading the StartLine and Headers so they are
/// immediately useful.
pub fn new(mut tcp: TcpStream) -> HttpResult<Request> {
let remote_addr = try_io!(tcp.peer_name());
let mut tcp = BufferedReader::new(tcp);
let (method, uri, version) = try!(read_request_line(&mut tcp));
let mut headers = try!(Headers::from_raw(&mut tcp));
pub fn new(mut stream: S) -> HttpResult<Request<S>> {
let remote_addr = try_io!(stream.peer_name());
let mut stream = BufferedReader::new(stream);
let (method, uri, version) = try!(read_request_line(&mut stream));
let mut headers = try!(Headers::from_raw(&mut stream));
debug!("{} {} {}", method, uri, version);
debug!("{}", headers);
@@ -47,12 +47,12 @@ impl Request {
let body = if headers.has::<ContentLength>() {
match headers.get_ref::<ContentLength>() {
Some(&ContentLength(len)) => SizedReader(tcp, len),
Some(&ContentLength(len)) => SizedReader(stream, len),
None => unreachable!()
}
} else {
todo!("check for Transfer-Encoding: chunked");
ChunkedReader(tcp, None)
ChunkedReader(stream, None)
};
Ok(Request {
@@ -66,7 +66,7 @@ impl Request {
}
}
impl Reader for Request {
impl<S: NetworkStream> Reader for Request<S> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.body.read(buf)
}

View File

@@ -3,15 +3,15 @@
//! These are responses sent by a `hyper::Server` to clients, after
//! receiving a request.
use std::io::{BufferedWriter, IoResult};
use std::io::net::tcp::TcpStream;
use time::now_utc;
use header;
use header::common;
use status;
use version;
use rfc7230::{CR, LF, LINE_ENDING};
use status;
use net::NetworkStream;
use version;
/// Phantom type indicating Headers and StatusCode have not been written.
pub struct Fresh;
@@ -26,18 +26,18 @@ impl WriteStatus for Streaming {}
impl WriteStatus for Fresh {}
/// The outgoing half for a Tcp connection, created by a `Server` and given to a `Handler`.
pub struct Response<W: WriteStatus> {
pub struct Response<W: WriteStatus, S: NetworkStream> {
/// The HTTP version of this response.
pub version: version::HttpVersion,
// Stream the Response is writing to, not accessible through UnwrittenResponse
body: BufferedWriter<TcpStream>, // TODO: use a HttpWriter from rfc7230
body: BufferedWriter<S>, // TODO: use a HttpWriter from rfc7230
// The status code for the request.
status: status::StatusCode,
// The outgoing headers on this response.
headers: header::Headers
}
impl<W: WriteStatus> Response<W> {
impl<W: WriteStatus, S: NetworkStream> Response<W, S> {
/// The status of this response.
#[inline]
pub fn status(&self) -> status::StatusCode { self.status }
@@ -47,9 +47,9 @@ impl<W: WriteStatus> Response<W> {
/// Construct a Response from its constituent parts.
pub fn construct(version: version::HttpVersion,
body: BufferedWriter<TcpStream>,
body: BufferedWriter<S>,
status: status::StatusCode,
headers: header::Headers) -> Response<Fresh> {
headers: header::Headers) -> Response<Fresh, S> {
Response {
status: status,
version: version,
@@ -59,19 +59,19 @@ impl<W: WriteStatus> Response<W> {
}
}
impl Response<Fresh> {
impl<S: NetworkStream> Response<Fresh, S> {
/// Creates a new Response that can be used to write to a network stream.
pub fn new(tcp: TcpStream) -> Response<Fresh> {
pub fn new(stream: S) -> Response<Fresh, S> {
Response {
status: status::Ok,
version: version::Http11,
headers: header::Headers::new(),
body: BufferedWriter::new(tcp)
body: BufferedWriter::new(stream)
}
}
/// Consume this Response<Fresh>, writing the Headers and Status and creating a Response<Streaming>
pub fn start(mut self) -> IoResult<Response<Streaming>> {
pub fn start(mut self) -> IoResult<Response<Streaming, S>> {
debug!("writing head: {} {}", self.version, self.status);
try!(write!(self.body, "{} {}{}{}", self.version, self.status, CR as char, LF as char));
@@ -104,12 +104,12 @@ impl Response<Fresh> {
pub fn headers_mut(&mut self) -> &mut header::Headers { &mut self.headers }
/// Deconstruct this Response into its constituent parts.
pub fn deconstruct(self) -> (version::HttpVersion, BufferedWriter<TcpStream>, status::StatusCode, header::Headers) {
pub fn deconstruct(self) -> (version::HttpVersion, BufferedWriter<S>, status::StatusCode, header::Headers) {
(self.version, self.body, self.status, self.headers)
}
}
impl Response<Streaming> {
impl<S: NetworkStream> Response<Streaming, S> {
/// Flushes all writing of a response to the client.
pub fn end(mut self) -> IoResult<()> {
debug!("ending");
@@ -117,7 +117,7 @@ impl Response<Streaming> {
}
}
impl Writer for Response<Streaming> {
impl<S: NetworkStream> Writer for Response<Streaming, S> {
fn write(&mut self, msg: &[u8]) -> IoResult<()> {
debug!("write {:u} bytes", msg.len());
self.body.write(msg)