feat(hyper): switch to std::io, std::net, and std::path.

All instances of `old_io` and `old_path` were switched to use the new
shiny `std::io`, `std::net`, and `std::path` modules. This means that
`Request` and `Response` implement `Read` and `Write` now.

Because of the changes to `TcpListener`, this also takes the opportunity
to correct the method usage of `Server`. As with other
languages/frameworks, the server is first created with a handler, and
then a host/port is passed to a `listen` method. This reverses what
`Server` used to do.

Closes #347

BREAKING CHANGE: Check the docs. Everything was touched.
This commit is contained in:
Sean McArthur
2015-02-17 15:29:52 -08:00
parent 7235d3f74a
commit 0fd6fcd7c7
22 changed files with 641 additions and 639 deletions

View File

@@ -27,12 +27,13 @@ Hello World Server:
```rust ```rust
extern crate hyper; extern crate hyper;
use hyper::status::StatusCode; use std::io::Write;
use hyper::server::Server; use std::net::IpAddr;
use hyper::server::request::Request;
use hyper::server::response::Response; use hyper::Server;
use hyper::server::Request;
use hyper::server::Response;
use hyper::net::Fresh; use hyper::net::Fresh;
use hyper::IpAddr::Ipv4Addr;
fn hello(_: Request, mut res: Response<Fresh>) { fn hello(_: Request, mut res: Response<Fresh>) {
let mut res = res.start().unwrap(); let mut res = res.start().unwrap();
@@ -41,8 +42,7 @@ fn hello(_: Request, mut res: Response<Fresh>) {
} }
fn main() { fn main() {
let server = Server::http(Ipv4Addr(127, 0, 0, 1), 1337); Server::http(hello).listen(IpAddr::new_v4(127, 0, 0, 1), 3000).unwrap();
server.listen(hello).unwrap();
} }
``` ```
@@ -51,7 +51,9 @@ Client:
```rust ```rust
extern crate hyper; extern crate hyper;
use hyper::client::Client; use std::io::Read;
use hyper::Client;
use hyper::header::Connection; use hyper::header::Connection;
use hyper::header::ConnectionOption; use hyper::header::ConnectionOption;
@@ -67,7 +69,8 @@ fn main() {
.send().unwrap(); .send().unwrap();
// Read the Response. // Read the Response.
let body = res.read_to_string().unwrap(); let mut body = String::new();
res.read_to_string(&mut body).unwrap();
println!("Response: {}", body); println!("Response: {}", body);
} }

View File

@@ -1,33 +1,53 @@
#![feature(core, old_io, test)] #![feature(collections, io, net, test)]
extern crate hyper; extern crate hyper;
extern crate test; extern crate test;
use std::fmt; use std::fmt;
use std::old_io::net::ip::Ipv4Addr; use std::io::{self, Read, Write, Cursor};
use hyper::server::{Request, Response, Server}; use std::net::SocketAddr;
use hyper::header::Headers;
use hyper::Client;
fn listen() -> hyper::server::Listening { use hyper::net;
let server = Server::http(Ipv4Addr(127, 0, 0, 1), 0);
server.listen(handle).unwrap() static README: &'static [u8] = include_bytes!("../README.md");
struct MockStream {
read: Cursor<Vec<u8>>
} }
macro_rules! try_return( impl MockStream {
($e:expr) => {{ fn new() -> MockStream {
match $e { let head = b"HTTP/1.1 200 OK\r\nServer: Mock\r\n\r\n";
Ok(v) => v, let mut res = head.to_vec();
Err(..) => return res.push_all(README);
MockStream {
read: Cursor::new(res)
} }
}} }
); }
fn handle(_r: Request, res: Response) { impl Clone for MockStream {
static BODY: &'static [u8] = b"Benchmarking hyper vs others!"; fn clone(&self) -> MockStream {
let mut res = try_return!(res.start()); MockStream {
try_return!(res.write_all(BODY)); read: Cursor::new(self.read.get_ref().clone())
try_return!(res.end()); }
}
}
impl Read for MockStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.read.read(buf)
}
}
impl Write for MockStream {
fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
// we're mocking, what do we care.
Ok(msg.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
} }
#[derive(Clone)] #[derive(Clone)]
@@ -48,17 +68,36 @@ impl hyper::header::HeaderFormat for Foo {
} }
} }
#[bench] impl net::NetworkStream for MockStream {
fn bench_hyper(b: &mut test::Bencher) { fn peer_addr(&mut self) -> io::Result<SocketAddr> {
let mut listening = listen(); Ok("127.0.0.1:1337".parse().unwrap())
let s = format!("http://{}/", listening.socket); }
let url = s.as_slice(); }
let mut client = Client::new();
let mut headers = Headers::new(); struct MockConnector;
headers.set(Foo);
b.iter(|| { impl net::NetworkConnector for MockConnector {
client.get(url).header(Foo).send().unwrap().read_to_string().unwrap(); type Stream = MockStream;
}); fn connect(&mut self, _: &str, _: u16, _: &str) -> io::Result<MockStream> {
listening.close().unwrap() Ok(MockStream::new())
}
}
#[bench]
fn bench_mock_hyper(b: &mut test::Bencher) {
let url = "http://127.0.0.1:1337/";
b.iter(|| {
let mut req = hyper::client::Request::with_connector(
hyper::Get, hyper::Url::parse(url).unwrap(), &mut MockConnector
).unwrap();
req.headers_mut().set(Foo);
let mut s = String::new();
req
.start().unwrap()
.send().unwrap()
.read_to_string(&mut s).unwrap()
});
} }

View File

@@ -1,98 +0,0 @@
#![feature(collections, old_io, test)]
extern crate hyper;
extern crate test;
use std::fmt;
use std::old_io::{IoResult, MemReader};
use std::old_io::net::ip::SocketAddr;
use hyper::net;
static README: &'static [u8] = include_bytes!("../README.md");
struct MockStream {
read: MemReader,
}
impl Clone for MockStream {
fn clone(&self) -> MockStream {
MockStream::new()
}
}
impl MockStream {
fn new() -> MockStream {
let head = b"HTTP/1.1 200 OK\r\nServer: Mock\r\n\r\n";
let mut res = head.to_vec();
res.push_all(README);
MockStream {
read: MemReader::new(res),
}
}
}
impl Reader for MockStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
self.read.read(buf)
}
}
impl Writer for MockStream {
fn write_all(&mut self, _msg: &[u8]) -> IoResult<()> {
// we're mocking, what do we care.
Ok(())
}
}
#[derive(Clone)]
struct Foo;
impl hyper::header::Header for Foo {
fn header_name() -> &'static str {
"x-foo"
}
fn parse_header(_: &[Vec<u8>]) -> Option<Foo> {
None
}
}
impl hyper::header::HeaderFormat for Foo {
fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str("Bar")
}
}
impl net::NetworkStream for MockStream {
fn peer_name(&mut self) -> IoResult<SocketAddr> {
Ok("127.0.0.1:1337".parse().unwrap())
}
}
struct MockConnector;
impl net::NetworkConnector for MockConnector {
type Stream = MockStream;
fn connect(&mut self, _: &str, _: u16, _: &str) -> IoResult<MockStream> {
Ok(MockStream::new())
}
}
#[bench]
fn bench_mock_hyper(b: &mut test::Bencher) {
let url = "http://127.0.0.1:1337/";
b.iter(|| {
let mut req = hyper::client::Request::with_connector(
hyper::Get, hyper::Url::parse(url).unwrap(), &mut MockConnector
).unwrap();
req.headers_mut().set(Foo);
req
.start().unwrap()
.send().unwrap()
.read_to_string().unwrap()
});
}

View File

@@ -1,9 +1,10 @@
#![feature(old_io, test)] #![feature(io, net, test)]
extern crate hyper; extern crate hyper;
extern crate test; extern crate test;
use test::Bencher; use test::Bencher;
use std::old_io::net::ip::Ipv4Addr; use std::io::{Read, Write};
use std::net::IpAddr;
use hyper::method::Method::Get; use hyper::method::Method::Get;
use hyper::server::{Request, Response}; use hyper::server::{Request, Response};
@@ -12,7 +13,8 @@ static PHRASE: &'static [u8] = b"Benchmarking hyper vs others!";
fn request(url: hyper::Url) { fn request(url: hyper::Url) {
let req = hyper::client::Request::new(Get, url).unwrap(); let req = hyper::client::Request::new(Get, url).unwrap();
req.start().unwrap().send().unwrap().read_to_string().unwrap(); let mut s = String::new();
req.start().unwrap().send().unwrap().read_to_string(&mut s).unwrap();
} }
fn hyper_handle(_: Request, res: Response) { fn hyper_handle(_: Request, res: Response) {
@@ -23,8 +25,8 @@ fn hyper_handle(_: Request, res: Response) {
#[bench] #[bench]
fn bench_hyper(b: &mut Bencher) { fn bench_hyper(b: &mut Bencher) {
let server = hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 0); let server = hyper::Server::http(hyper_handle);
let mut listener = server.listen(hyper_handle).unwrap(); let mut listener = server.listen(IpAddr::new_v4(127, 0, 0, 1), 0).unwrap();
let url = hyper::Url::parse(&*format!("http://{}", listener.socket)).unwrap(); let url = hyper::Url::parse(&*format!("http://{}", listener.socket)).unwrap();
b.iter(|| request(url.clone())); b.iter(|| request(url.clone()));

View File

@@ -1,9 +1,7 @@
#![feature(env, old_io)] #![feature(env)]
extern crate hyper; extern crate hyper;
use std::env; use std::env;
use std::old_io::stdout;
use std::old_io::util::copy;
use hyper::Client; use hyper::Client;
@@ -18,16 +16,12 @@ fn main() {
let mut client = Client::new(); let mut client = Client::new();
let mut res = match client.get(&*url).send() { let res = match client.get(&*url).send() {
Ok(res) => res, Ok(res) => res,
Err(err) => panic!("Failed to connect: {:?}", err) Err(err) => panic!("Failed to connect: {:?}", err)
}; };
println!("Response: {}", res.status); println!("Response: {}", res.status);
println!("Headers:\n{}", res.headers); println!("Headers:\n{}", res.headers);
match copy(&mut res, &mut stdout()) { //TODO: add copy back when std::stdio impls std::io::Write.
Ok(..) => (),
Err(e) => panic!("Stream failure: {:?}", e)
};
} }

View File

@@ -1,7 +1,8 @@
#![feature(old_io)] #![feature(io, net)]
extern crate hyper; extern crate hyper;
use std::old_io::net::ip::Ipv4Addr; use std::io::Write;
use std::net::IpAddr;
use hyper::server::{Request, Response}; use hyper::server::{Request, Response};
static PHRASE: &'static [u8] = b"Hello World!"; static PHRASE: &'static [u8] = b"Hello World!";
@@ -13,7 +14,7 @@ fn hello(_: Request, res: Response) {
} }
fn main() { fn main() {
let _listening = hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000) let _listening = hyper::Server::http(hello)
.listen(hello).unwrap(); .listen(IpAddr::new_v4(127, 0, 0, 1), 3000).unwrap();
println!("Listening on http://127.0.0.1:3000"); println!("Listening on http://127.0.0.1:3000");
} }

View File

@@ -1,9 +1,9 @@
#![feature(old_io)] #![feature(io, net)]
extern crate hyper; extern crate hyper;
#[macro_use] extern crate log; #[macro_use] extern crate log;
use std::old_io::util::copy; use std::io::{Write, copy};
use std::old_io::net::ip::Ipv4Addr; use std::net::IpAddr;
use hyper::{Get, Post}; use hyper::{Get, Post};
use hyper::header::ContentLength; use hyper::header::ContentLength;
@@ -50,7 +50,7 @@ fn echo(mut req: Request, mut res: Response) {
} }
fn main() { fn main() {
let server = Server::http(Ipv4Addr(127, 0, 0, 1), 1337); let server = Server::http(echo);
let _guard = server.listen(echo).unwrap(); let _guard = server.listen(IpAddr::new_v4(127, 0, 0, 1), 1337).unwrap();
println!("Listening on http://127.0.0.1:1337"); println!("Listening on http://127.0.0.1:1337");
} }

View File

@@ -18,8 +18,7 @@
//! to the `status`, the `headers`, and the response body via the `Writer` //! to the `status`, the `headers`, and the response body via the `Writer`
//! trait. //! trait.
use std::default::Default; use std::default::Default;
use std::old_io::IoResult; use std::io::{self, copy, Read};
use std::old_io::util::copy;
use std::iter::Extend; use std::iter::Extend;
use url::UrlParser; use url::UrlParser;
@@ -30,7 +29,7 @@ use header::{ContentLength, Location};
use method::Method; use method::Method;
use net::{NetworkConnector, HttpConnector, ContextVerifier}; use net::{NetworkConnector, HttpConnector, ContextVerifier};
use status::StatusClass::Redirection; use status::StatusClass::Redirection;
use {Url, Port, HttpResult}; use {Url, HttpResult};
use HttpError::HttpUriError; use HttpError::HttpUriError;
pub use self::request::Request; pub use self::request::Request;
@@ -238,9 +237,9 @@ pub trait IntoBody<'a> {
/// The target enum for the IntoBody trait. /// The target enum for the IntoBody trait.
pub enum Body<'a> { pub enum Body<'a> {
/// A Reader does not necessarily know it's size, so it is chunked. /// A Reader does not necessarily know it's size, so it is chunked.
ChunkedBody(&'a mut (Reader + 'a)), ChunkedBody(&'a mut (Read + 'a)),
/// For Readers that can know their size, like a `File`. /// For Readers that can know their size, like a `File`.
SizedBody(&'a mut (Reader + 'a), u64), SizedBody(&'a mut (Read + 'a), u64),
/// A String has a size, and uses Content-Length. /// A String has a size, and uses Content-Length.
BufBody(&'a [u8] , usize), BufBody(&'a [u8] , usize),
} }
@@ -255,13 +254,13 @@ impl<'a> Body<'a> {
} }
} }
impl<'a> Reader for Body<'a> { impl<'a> Read for Body<'a> {
#[inline] #[inline]
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self { match *self {
Body::ChunkedBody(ref mut r) => r.read(buf), Body::ChunkedBody(ref mut r) => r.read(buf),
Body::SizedBody(ref mut r, _) => r.read(buf), Body::SizedBody(ref mut r, _) => r.read(buf),
Body::BufBody(ref mut r, _) => r.read(buf), Body::BufBody(ref mut r, _) => Read::read(r, buf),
} }
} }
} }
@@ -288,7 +287,7 @@ impl<'a> IntoBody<'a> for &'a str {
} }
} }
impl<'a, R: Reader> IntoBody<'a> for &'a mut R { impl<'a, R: Read> IntoBody<'a> for &'a mut R {
#[inline] #[inline]
fn into_body(self) -> Body<'a> { fn into_body(self) -> Body<'a> {
Body::ChunkedBody(self) Body::ChunkedBody(self)
@@ -337,7 +336,7 @@ impl Default for RedirectPolicy {
} }
} }
fn get_host_and_port(url: &Url) -> HttpResult<(String, Port)> { fn get_host_and_port(url: &Url) -> HttpResult<(String, u16)> {
let host = match url.serialize_host() { let host = match url.serialize_host() {
Some(host) => host, Some(host) => host,
None => return Err(HttpUriError(UrlError::EmptyHost)) None => return Err(HttpUriError(UrlError::EmptyHost))

View File

@@ -1,6 +1,6 @@
//! Client Requests //! Client Requests
use std::old_io::{BufferedWriter, IoResult};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::io::{self, Write, BufWriter};
use url::Url; use url::Url;
@@ -23,7 +23,7 @@ pub struct Request<W> {
/// The HTTP version of this request. /// The HTTP version of this request.
pub version: version::HttpVersion, pub version: version::HttpVersion,
body: HttpWriter<BufferedWriter<Box<NetworkStream + Send>>>, body: HttpWriter<BufWriter<Box<NetworkStream + Send>>>,
headers: Headers, headers: Headers,
method: method::Method, method: method::Method,
@@ -59,7 +59,7 @@ impl Request<Fresh> {
let (host, port) = try!(get_host_and_port(&url)); let (host, port) = try!(get_host_and_port(&url));
let stream = try!(connector.connect(&*host, port, &*url.scheme)); let stream = try!(connector.connect(&*host, port, &*url.scheme));
let stream = ThroughWriter(BufferedWriter::new(box stream as Box<NetworkStream + Send>)); let stream = ThroughWriter(BufWriter::new(box stream as Box<NetworkStream + Send>));
let mut headers = Headers::new(); let mut headers = Headers::new();
headers.set(Host { headers.set(Host {
@@ -96,7 +96,7 @@ impl Request<Fresh> {
Method::Get | Method::Head => { Method::Get | Method::Head => {
debug!("headers [\n{:?}]", self.headers); debug!("headers [\n{:?}]", self.headers);
try!(write!(&mut self.body, "{}{}", self.headers, LINE_ENDING)); try!(write!(&mut self.body, "{}{}", self.headers, LINE_ENDING));
EmptyWriter(self.body.unwrap()) EmptyWriter(self.body.into_inner())
}, },
_ => { _ => {
let mut chunked = true; let mut chunked = true;
@@ -131,9 +131,9 @@ impl Request<Fresh> {
try!(write!(&mut self.body, "{}{}", self.headers, LINE_ENDING)); try!(write!(&mut self.body, "{}{}", self.headers, LINE_ENDING));
if chunked { if chunked {
ChunkedWriter(self.body.unwrap()) ChunkedWriter(self.body.into_inner())
} else { } else {
SizedWriter(self.body.unwrap(), len) SizedWriter(self.body.into_inner(), len)
} }
} }
}; };
@@ -158,19 +158,19 @@ impl Request<Streaming> {
/// ///
/// Consumes the Request. /// Consumes the Request.
pub fn send(self) -> HttpResult<Response> { pub fn send(self) -> HttpResult<Response> {
let raw = try!(self.body.end()).into_inner(); let raw = try!(self.body.end()).into_inner().unwrap(); // end() already flushes
Response::new(raw) Response::new(raw)
} }
} }
impl Writer for Request<Streaming> { impl Write for Request<Streaming> {
#[inline] #[inline]
fn write_all(&mut self, msg: &[u8]) -> IoResult<()> { fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
self.body.write_all(msg) self.body.write(msg)
} }
#[inline] #[inline]
fn flush(&mut self) -> IoResult<()> { fn flush(&mut self) -> io::Result<()> {
self.body.flush() self.body.flush()
} }
} }
@@ -191,8 +191,8 @@ mod tests {
).unwrap(); ).unwrap();
let req = req.start().unwrap(); let req = req.start().unwrap();
let stream = *req.body.end().unwrap() let stream = *req.body.end().unwrap()
.into_inner().downcast::<MockStream>().ok().unwrap(); .into_inner().unwrap().downcast::<MockStream>().ok().unwrap();
let bytes = stream.write.into_inner(); let bytes = stream.write;
let s = from_utf8(&bytes[..]).unwrap(); let s = from_utf8(&bytes[..]).unwrap();
assert!(!s.contains("Content-Length:")); assert!(!s.contains("Content-Length:"));
assert!(!s.contains("Transfer-Encoding:")); assert!(!s.contains("Transfer-Encoding:"));
@@ -205,8 +205,8 @@ mod tests {
).unwrap(); ).unwrap();
let req = req.start().unwrap(); let req = req.start().unwrap();
let stream = *req.body.end().unwrap() let stream = *req.body.end().unwrap()
.into_inner().downcast::<MockStream>().ok().unwrap(); .into_inner().unwrap().downcast::<MockStream>().ok().unwrap();
let bytes = stream.write.into_inner(); let bytes = stream.write;
let s = from_utf8(&bytes[..]).unwrap(); let s = from_utf8(&bytes[..]).unwrap();
assert!(!s.contains("Content-Length:")); assert!(!s.contains("Content-Length:"));
assert!(!s.contains("Transfer-Encoding:")); assert!(!s.contains("Transfer-Encoding:"));

View File

@@ -1,6 +1,6 @@
//! Client Responses //! Client Responses
use std::io::{self, Read, BufReader};
use std::num::FromPrimitive; use std::num::FromPrimitive;
use std::old_io::{BufferedReader, IoResult};
use std::marker::PhantomData; use std::marker::PhantomData;
use header; use header;
@@ -23,7 +23,7 @@ pub struct Response<S = HttpStream> {
/// The HTTP version of this response from the server. /// The HTTP version of this response from the server.
pub version: version::HttpVersion, pub version: version::HttpVersion,
status_raw: RawStatus, status_raw: RawStatus,
body: HttpReader<BufferedReader<Box<NetworkStream + Send>>>, body: HttpReader<BufReader<Box<NetworkStream + Send>>>,
_marker: PhantomData<S>, _marker: PhantomData<S>,
} }
@@ -35,7 +35,7 @@ impl Response {
/// Creates a new response from a server. /// Creates a new response from a server.
pub fn new(stream: Box<NetworkStream + Send>) -> HttpResult<Response> { pub fn new(stream: Box<NetworkStream + Send>) -> HttpResult<Response> {
let mut stream = BufferedReader::new(stream); let mut stream = BufReader::new(stream);
let (version, raw_status) = try!(read_status_line(&mut stream)); let (version, raw_status) = try!(read_status_line(&mut stream));
let status = match FromPrimitive::from_u16(raw_status.0) { let status = match FromPrimitive::from_u16(raw_status.0) {
Some(status) => status, Some(status) => status,
@@ -89,13 +89,13 @@ impl Response {
/// Consumes the Request to return the NetworkStream underneath. /// Consumes the Request to return the NetworkStream underneath.
pub fn into_inner(self) -> Box<NetworkStream + Send> { pub fn into_inner(self) -> Box<NetworkStream + Send> {
self.body.unwrap().into_inner() self.body.into_inner().into_inner()
} }
} }
impl Reader for Response { impl Read for Response {
#[inline] #[inline]
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.body.read(buf) self.body.read(buf)
} }
} }
@@ -104,7 +104,7 @@ impl Reader for Response {
mod tests { mod tests {
use std::borrow::Cow::Borrowed; use std::borrow::Cow::Borrowed;
use std::boxed::BoxAny; use std::boxed::BoxAny;
use std::old_io::BufferedReader; use std::io::{self, Read, BufReader};
use std::marker::PhantomData; use std::marker::PhantomData;
use header::Headers; use header::Headers;
@@ -119,14 +119,20 @@ mod tests {
use super::Response; use super::Response;
fn read_to_string(mut r: Response) -> io::Result<String> {
let mut s = String::new();
try!(r.read_to_string(&mut s));
Ok(s)
}
#[test] #[test]
fn test_unwrap() { fn test_into_inner() {
let res = Response { let res = Response {
status: status::StatusCode::Ok, status: status::StatusCode::Ok,
headers: Headers::new(), headers: Headers::new(),
version: version::HttpVersion::Http11, version: version::HttpVersion::Http11,
body: EofReader(BufferedReader::new(box MockStream::new() as Box<NetworkStream + Send>)), body: EofReader(BufReader::new(box MockStream::new() as Box<NetworkStream + Send>)),
status_raw: RawStatus(200, Borrowed("OK")), status_raw: RawStatus(200, Borrowed("OK")),
_marker: PhantomData, _marker: PhantomData,
}; };
@@ -152,7 +158,7 @@ mod tests {
\r\n" \r\n"
); );
let mut res = Response::new(box stream).unwrap(); let res = Response::new(box stream).unwrap();
// The status line is correct? // The status line is correct?
assert_eq!(res.status, status::StatusCode::Ok); assert_eq!(res.status, status::StatusCode::Ok);
@@ -166,8 +172,7 @@ mod tests {
None => panic!("Transfer-Encoding: chunked expected!"), None => panic!("Transfer-Encoding: chunked expected!"),
}; };
// The body is correct? // The body is correct?
let body = res.read_to_string().unwrap(); assert_eq!(read_to_string(res), Ok("qwert".to_string()));
assert_eq!("qwert", body);
} }
/// Tests that when a chunk size is not a valid radix-16 number, an error /// Tests that when a chunk size is not a valid radix-16 number, an error
@@ -184,9 +189,9 @@ mod tests {
\r\n" \r\n"
); );
let mut res = Response::new(box stream).unwrap(); let res = Response::new(box stream).unwrap();
assert!(res.read_to_string().is_err()); assert!(read_to_string(res).is_err());
} }
/// Tests that when a chunk size contains an invalid extension, an error is /// Tests that when a chunk size contains an invalid extension, an error is
@@ -203,9 +208,9 @@ mod tests {
\r\n" \r\n"
); );
let mut res = Response::new(box stream).unwrap(); let res = Response::new(box stream).unwrap();
assert!(res.read_to_string().is_err()); assert!(read_to_string(res).is_err());
} }
/// Tests that when a valid extension that contains a digit is appended to /// Tests that when a valid extension that contains a digit is appended to
@@ -222,8 +227,8 @@ mod tests {
\r\n" \r\n"
); );
let mut res = Response::new(box stream).unwrap(); let res = Response::new(box stream).unwrap();
assert_eq!("1", res.read_to_string().unwrap()) assert_eq!(read_to_string(res), Ok("1".to_string()));
} }
} }

View File

@@ -1,4 +1,4 @@
use header::{self, QualityItem}; use header::QualityItem;
use std::str::FromStr; use std::str::FromStr;
use std::fmt; use std::fmt;
@@ -49,7 +49,6 @@ impl_list_header!(AcceptLanguage,
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use header::{Header, qitem, Quality, QualityItem}; use header::{Header, qitem, Quality, QualityItem};
use super::*; use super::*;
#[test] #[test]

View File

@@ -142,14 +142,9 @@ impl FromStr for Basic {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::old_io::MemReader;
use super::{Authorization, Basic}; use super::{Authorization, Basic};
use super::super::super::{Headers}; use super::super::super::{Headers};
fn mem(s: &str) -> MemReader {
MemReader::new(s.as_bytes().to_vec())
}
#[test] #[test]
fn test_raw_auth() { fn test_raw_auth() {
let mut headers = Headers::new(); let mut headers = Headers::new();
@@ -159,7 +154,7 @@ mod tests {
#[test] #[test]
fn test_raw_auth_parse() { fn test_raw_auth_parse() {
let headers = Headers::from_raw(&mut mem("Authorization: foo bar baz\r\n\r\n")).unwrap(); let headers = Headers::from_raw(&mut b"Authorization: foo bar baz\r\n\r\n").unwrap();
assert_eq!(&headers.get::<Authorization<String>>().unwrap().0[..], "foo bar baz"); assert_eq!(&headers.get::<Authorization<String>>().unwrap().0[..], "foo bar baz");
} }
@@ -179,7 +174,7 @@ mod tests {
#[test] #[test]
fn test_basic_auth_parse() { fn test_basic_auth_parse() {
let headers = Headers::from_raw(&mut mem("Authorization: Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==\r\n\r\n")).unwrap(); let headers = Headers::from_raw(&mut b"Authorization: Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==\r\n\r\n").unwrap();
let auth = headers.get::<Authorization<Basic>>().unwrap(); let auth = headers.get::<Authorization<Basic>>().unwrap();
assert_eq!(&auth.0.username[..], "Aladdin"); assert_eq!(&auth.0.username[..], "Aladdin");
assert_eq!(auth.0.password, Some("open sesame".to_string())); assert_eq!(auth.0.password, Some("open sesame".to_string()));
@@ -187,7 +182,7 @@ mod tests {
#[test] #[test]
fn test_basic_auth_parse_no_password() { fn test_basic_auth_parse_no_password() {
let headers = Headers::from_raw(&mut mem("Authorization: Basic QWxhZGRpbjo=\r\n\r\n")).unwrap(); let headers = Headers::from_raw(&mut b"Authorization: Basic QWxhZGRpbjo=\r\n\r\n").unwrap();
let auth = headers.get::<Authorization<Basic>>().unwrap(); let auth = headers.get::<Authorization<Basic>>().unwrap();
assert_eq!(auth.0.username.as_slice(), "Aladdin"); assert_eq!(auth.0.username.as_slice(), "Aladdin");
assert_eq!(auth.0.password, Some("".to_string())); assert_eq!(auth.0.password, Some("".to_string()));

View File

@@ -1,5 +1,4 @@
use header::{Header, HeaderFormat}; use header::{Header, HeaderFormat};
use Port;
use std::fmt; use std::fmt;
use header::parsing::from_one_raw_str; use header::parsing::from_one_raw_str;
@@ -15,7 +14,7 @@ pub struct Host {
/// The hostname, such a example.domain. /// The hostname, such a example.domain.
pub hostname: String, pub hostname: String,
/// An optional port number. /// An optional port number.
pub port: Option<Port> pub port: Option<u16>
} }
impl Header for Host { impl Header for Host {

View File

@@ -7,6 +7,7 @@
use std::any::{Any, TypeId}; use std::any::{Any, TypeId};
use std::borrow::Cow::{Borrowed, Owned}; use std::borrow::Cow::{Borrowed, Owned};
use std::fmt; use std::fmt;
use std::io::Read;
use std::raw::TraitObject; use std::raw::TraitObject;
use std::str::from_utf8; use std::str::from_utf8;
use std::collections::HashMap; use std::collections::HashMap;
@@ -132,7 +133,7 @@ impl Headers {
} }
#[doc(hidden)] #[doc(hidden)]
pub fn from_raw<R: Reader>(rdr: &mut R) -> HttpResult<Headers> { pub fn from_raw<R: Read>(rdr: &mut R) -> HttpResult<Headers> {
let mut headers = Headers::new(); let mut headers = Headers::new();
let mut count = 0u32; let mut count = 0u32;
loop { loop {
@@ -534,7 +535,6 @@ impl<'a, H: HeaderFormat> fmt::Debug for HeaderFormatter<'a, H> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::old_io::MemReader;
use std::fmt; use std::fmt;
use mime::Mime; use mime::Mime;
use mime::TopLevel::Text; use mime::TopLevel::Text;
@@ -544,13 +544,9 @@ mod tests {
use test::Bencher; use test::Bencher;
fn mem(s: &str) -> MemReader {
MemReader::new(s.as_bytes().to_vec())
}
#[test] #[test]
fn test_from_raw() { fn test_from_raw() {
let headers = Headers::from_raw(&mut mem("Content-Length: 10\r\n\r\n")).unwrap(); let headers = Headers::from_raw(&mut b"Content-Length: 10\r\n\r\n").unwrap();
assert_eq!(headers.get(), Some(&ContentLength(10))); assert_eq!(headers.get(), Some(&ContentLength(10)));
} }
@@ -603,21 +599,21 @@ mod tests {
#[test] #[test]
fn test_different_structs_for_same_header() { fn test_different_structs_for_same_header() {
let headers = Headers::from_raw(&mut mem("Content-Length: 10\r\n\r\n")).unwrap(); let headers = Headers::from_raw(&mut b"Content-Length: 10\r\n\r\n").unwrap();
let ContentLength(_) = *headers.get::<ContentLength>().unwrap(); let ContentLength(_) = *headers.get::<ContentLength>().unwrap();
assert!(headers.get::<CrazyLength>().is_none()); assert!(headers.get::<CrazyLength>().is_none());
} }
#[test] #[test]
fn test_trailing_whitespace() { fn test_trailing_whitespace() {
let headers = Headers::from_raw(&mut mem("Content-Length: 10 \r\n\r\n")).unwrap(); let headers = Headers::from_raw(&mut b"Content-Length: 10 \r\n\r\n").unwrap();
let ContentLength(_) = *headers.get::<ContentLength>().unwrap(); let ContentLength(_) = *headers.get::<ContentLength>().unwrap();
assert!(headers.get::<CrazyLength>().is_none()); assert!(headers.get::<CrazyLength>().is_none());
} }
#[test] #[test]
fn test_multiple_reads() { fn test_multiple_reads() {
let headers = Headers::from_raw(&mut mem("Content-Length: 10\r\n\r\n")).unwrap(); let headers = Headers::from_raw(&mut b"Content-Length: 10\r\n\r\n").unwrap();
let ContentLength(one) = *headers.get::<ContentLength>().unwrap(); let ContentLength(one) = *headers.get::<ContentLength>().unwrap();
let ContentLength(two) = *headers.get::<ContentLength>().unwrap(); let ContentLength(two) = *headers.get::<ContentLength>().unwrap();
assert_eq!(one, two); assert_eq!(one, two);
@@ -625,14 +621,14 @@ mod tests {
#[test] #[test]
fn test_different_reads() { fn test_different_reads() {
let headers = Headers::from_raw(&mut mem("Content-Length: 10\r\nContent-Type: text/plain\r\n\r\n")).unwrap(); let headers = Headers::from_raw(&mut b"Content-Length: 10\r\nContent-Type: text/plain\r\n\r\n").unwrap();
let ContentLength(_) = *headers.get::<ContentLength>().unwrap(); let ContentLength(_) = *headers.get::<ContentLength>().unwrap();
let ContentType(_) = *headers.get::<ContentType>().unwrap(); let ContentType(_) = *headers.get::<ContentType>().unwrap();
} }
#[test] #[test]
fn test_get_mutable() { fn test_get_mutable() {
let mut headers = Headers::from_raw(&mut mem("Content-Length: 10\r\nContent-Type: text/plain\r\n\r\n")).unwrap(); let mut headers = Headers::from_raw(&mut b"Content-Length: 10\r\nContent-Type: text/plain\r\n\r\n").unwrap();
*headers.get_mut::<ContentLength>().unwrap() = ContentLength(20); *headers.get_mut::<ContentLength>().unwrap() = ContentLength(20);
assert_eq!(*headers.get::<ContentLength>().unwrap(), ContentLength(20)); assert_eq!(*headers.get::<ContentLength>().unwrap(), ContentLength(20));
} }
@@ -653,7 +649,7 @@ mod tests {
#[test] #[test]
fn test_headers_show_raw() { fn test_headers_show_raw() {
let headers = Headers::from_raw(&mut mem("Content-Length: 10\r\n\r\n")).unwrap(); let headers = Headers::from_raw(&mut b"Content-Length: 10\r\n\r\n").unwrap();
let s = headers.to_string(); let s = headers.to_string();
assert_eq!(s, "Content-Length: 10\r\n"); assert_eq!(s, "Content-Length: 10\r\n");
} }
@@ -720,7 +716,7 @@ mod tests {
#[bench] #[bench]
fn bench_headers_from_raw(b: &mut Bencher) { fn bench_headers_from_raw(b: &mut Bencher) {
b.iter(|| Headers::from_raw(&mut mem("Content-Length: 10\r\n\r\n")).unwrap()) b.iter(|| Headers::from_raw(&mut b"Content-Length: 10\r\n\r\n").unwrap())
} }
#[bench] #[bench]

View File

@@ -2,7 +2,7 @@
use std::borrow::Cow::{self, Borrowed, Owned}; use std::borrow::Cow::{self, Borrowed, Owned};
use std::borrow::IntoCow; use std::borrow::IntoCow;
use std::cmp::min; use std::cmp::min;
use std::old_io::{self, Reader, IoResult, BufWriter}; use std::io::{self, Read, Write, Cursor};
use std::num::from_u16; use std::num::from_u16;
use std::str; use std::str;
@@ -14,8 +14,8 @@ use status::StatusCode;
use uri; use uri;
use uri::RequestUri::{AbsolutePath, AbsoluteUri, Authority, Star}; use uri::RequestUri::{AbsolutePath, AbsoluteUri, Authority, Star};
use version::HttpVersion; use version::HttpVersion;
use version::HttpVersion::{Http09, Http10, Http11, Http20}; use version::HttpVersion::{Http09, Http10, Http11};
use HttpError::{HttpHeaderError, HttpIoError, HttpMethodError, HttpStatusError, use HttpError::{HttpHeaderError, HttpMethodError, HttpStatusError,
HttpUriError, HttpVersionError}; HttpUriError, HttpVersionError};
use HttpResult; use HttpResult;
@@ -52,10 +52,10 @@ pub enum HttpReader<R> {
EmptyReader(R), EmptyReader(R),
} }
impl<R: Reader> HttpReader<R> { impl<R: Read> HttpReader<R> {
/// Unwraps this HttpReader and returns the underlying Reader. /// Unwraps this HttpReader and returns the underlying Reader.
pub fn unwrap(self) -> R { pub fn into_inner(self) -> R {
match self { match self {
SizedReader(r, _) => r, SizedReader(r, _) => r,
ChunkedReader(r, _) => r, ChunkedReader(r, _) => r,
@@ -65,13 +65,13 @@ impl<R: Reader> HttpReader<R> {
} }
} }
impl<R: Reader> Reader for HttpReader<R> { impl<R: Read> Read for HttpReader<R> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self { match *self {
SizedReader(ref mut body, ref mut remaining) => { SizedReader(ref mut body, ref mut remaining) => {
debug!("Sized read, remaining={:?}", remaining); debug!("Sized read, remaining={:?}", remaining);
if *remaining == 0 { if *remaining == 0 {
Err(old_io::standard_error(old_io::EndOfFile)) Ok(0)
} else { } else {
let num = try!(body.read(buf)) as u64; let num = try!(body.read(buf)) as u64;
if num > *remaining { if num > *remaining {
@@ -97,7 +97,7 @@ impl<R: Reader> Reader for HttpReader<R> {
// if the 0 digit was missing from the stream, it would // if the 0 digit was missing from the stream, it would
// be an InvalidInput error instead. // be an InvalidInput error instead.
debug!("end of chunked"); debug!("end of chunked");
return Err(old_io::standard_error(old_io::EndOfFile)); return Ok(0)
} }
let to_read = min(rem as usize, buf.len()); let to_read = min(rem as usize, buf.len());
@@ -115,29 +115,44 @@ impl<R: Reader> Reader for HttpReader<R> {
EofReader(ref mut body) => { EofReader(ref mut body) => {
body.read(buf) body.read(buf)
}, },
EmptyReader(_) => Err(old_io::standard_error(old_io::EndOfFile)) EmptyReader(_) => Ok(0)
} }
} }
} }
fn eat<R: Reader>(rdr: &mut R, bytes: &[u8]) -> IoResult<()> { fn eat<R: Read>(rdr: &mut R, bytes: &[u8]) -> io::Result<()> {
let mut buf = [0];
for &b in bytes.iter() { for &b in bytes.iter() {
match try!(rdr.read_byte()) { match try!(rdr.read(&mut buf)) {
byte if byte == b => (), 1 if buf[0] == b => (),
_ => return Err(old_io::standard_error(old_io::InvalidInput)) _ => return Err(io::Error::new(io::ErrorKind::InvalidInput,
"Invalid characters found",
None))
} }
} }
Ok(()) Ok(())
} }
/// Chunked chunks start with 1*HEXDIGIT, indicating the size of the chunk. /// Chunked chunks start with 1*HEXDIGIT, indicating the size of the chunk.
fn read_chunk_size<R: Reader>(rdr: &mut R) -> IoResult<u64> { fn read_chunk_size<R: Read>(rdr: &mut R) -> io::Result<u64> {
macro_rules! byte (
($rdr:ident) => ({
let mut buf = [0];
match try!($rdr.read(&mut buf)) {
1 => buf[0],
_ => return Err(io::Error::new(io::ErrorKind::InvalidInput,
"Invalid chunk size line",
None)),
}
})
);
let mut size = 0u64; let mut size = 0u64;
let radix = 16; let radix = 16;
let mut in_ext = false; let mut in_ext = false;
let mut in_chunk_size = true; let mut in_chunk_size = true;
loop { loop {
match try!(rdr.read_byte()) { match byte!(rdr) {
b@b'0'...b'9' if in_chunk_size => { b@b'0'...b'9' if in_chunk_size => {
size *= radix; size *= radix;
size += (b - b'0') as u64; size += (b - b'0') as u64;
@@ -151,9 +166,12 @@ fn read_chunk_size<R: Reader>(rdr: &mut R) -> IoResult<u64> {
size += (b + 10 - b'A') as u64; size += (b + 10 - b'A') as u64;
}, },
CR => { CR => {
match try!(rdr.read_byte()) { match byte!(rdr) {
LF => break, LF => break,
_ => return Err(old_io::standard_error(old_io::InvalidInput)) _ => return Err(io::Error::new(io::ErrorKind::InvalidInput,
"Invalid chunk size line",
None))
} }
}, },
// If we weren't in the extension yet, the ";" signals its start // If we weren't in the extension yet, the ";" signals its start
@@ -177,7 +195,9 @@ fn read_chunk_size<R: Reader>(rdr: &mut R) -> IoResult<u64> {
// Finally, if we aren't in the extension and we're reading any // Finally, if we aren't in the extension and we're reading any
// other octet, the chunk size line is invalid! // other octet, the chunk size line is invalid!
_ => { _ => {
return Err(old_io::standard_error(old_io::InvalidInput)); return Err(io::Error::new(io::ErrorKind::InvalidInput,
"Invalid chunk size line",
None))
} }
} }
} }
@@ -186,7 +206,7 @@ fn read_chunk_size<R: Reader>(rdr: &mut R) -> IoResult<u64> {
} }
/// Writers to handle different Transfer-Encodings. /// Writers to handle different Transfer-Encodings.
pub enum HttpWriter<W: Writer> { pub enum HttpWriter<W: Write> {
/// A no-op Writer, used initially before Transfer-Encoding is determined. /// A no-op Writer, used initially before Transfer-Encoding is determined.
ThroughWriter(W), ThroughWriter(W),
/// A Writer for when Transfer-Encoding includes `chunked`. /// A Writer for when Transfer-Encoding includes `chunked`.
@@ -199,10 +219,10 @@ pub enum HttpWriter<W: Writer> {
EmptyWriter(W), EmptyWriter(W),
} }
impl<W: Writer> HttpWriter<W> { impl<W: Write> HttpWriter<W> {
/// Unwraps the HttpWriter and returns the underlying Writer. /// Unwraps the HttpWriter and returns the underlying Writer.
#[inline] #[inline]
pub fn unwrap(self) -> W { pub fn into_inner(self) -> W {
match self { match self {
ThroughWriter(w) => w, ThroughWriter(w) => w,
ChunkedWriter(w) => w, ChunkedWriter(w) => w,
@@ -241,24 +261,25 @@ impl<W: Writer> HttpWriter<W> {
/// A final `write_all()` is called with an empty message, and then flushed. /// A final `write_all()` is called with an empty message, and then flushed.
/// The ChunkedWriter variant will use this to write the 0-sized last-chunk. /// The ChunkedWriter variant will use this to write the 0-sized last-chunk.
#[inline] #[inline]
pub fn end(mut self) -> IoResult<W> { pub fn end(mut self) -> io::Result<W> {
try!(self.write_all(&[])); try!(self.write(&[]));
try!(self.flush()); try!(self.flush());
Ok(self.unwrap()) Ok(self.into_inner())
} }
} }
impl<W: Writer> Writer for HttpWriter<W> { impl<W: Write> Write for HttpWriter<W> {
#[inline] #[inline]
fn write_all(&mut self, msg: &[u8]) -> IoResult<()> { fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
match *self { match *self {
ThroughWriter(ref mut w) => w.write_all(msg), ThroughWriter(ref mut w) => w.write(msg),
ChunkedWriter(ref mut w) => { ChunkedWriter(ref mut w) => {
let chunk_size = msg.len(); let chunk_size = msg.len();
debug!("chunked write, size = {:?}", chunk_size); debug!("chunked write, size = {:?}", chunk_size);
try!(write!(w, "{:X}{}", chunk_size, LINE_ENDING)); try!(write!(w, "{:X}{}", chunk_size, LINE_ENDING));
try!(w.write_all(msg)); try!(w.write_all(msg));
w.write_str(LINE_ENDING) try!(w.write_all(LINE_ENDING.as_bytes()));
Ok(msg.len())
}, },
SizedWriter(ref mut w, ref mut remaining) => { SizedWriter(ref mut w, ref mut remaining) => {
let len = msg.len() as u64; let len = msg.len() as u64;
@@ -266,29 +287,24 @@ impl<W: Writer> Writer for HttpWriter<W> {
let len = *remaining; let len = *remaining;
*remaining = 0; *remaining = 0;
try!(w.write_all(&msg[..len as usize])); try!(w.write_all(&msg[..len as usize]));
Err(old_io::standard_error(old_io::ShortWrite(len as usize))) Ok(len as usize)
} else { } else {
*remaining -= len; *remaining -= len;
w.write_all(msg) try!(w.write_all(msg));
Ok(len as usize)
} }
}, },
EmptyWriter(..) => { EmptyWriter(..) => {
let bytes = msg.len(); if msg.len() != 0 {
if bytes == 0 { error!("Cannot include a body with this kind of message");
Ok(())
} else {
Err(old_io::IoError {
kind: old_io::ShortWrite(bytes),
desc: "EmptyWriter cannot write any bytes",
detail: Some("Cannot include a body with this kind of message".to_string())
})
} }
Ok(0)
} }
} }
} }
#[inline] #[inline]
fn flush(&mut self) -> IoResult<()> { fn flush(&mut self) -> io::Result<()> {
match *self { match *self {
ThroughWriter(ref mut w) => w.flush(), ThroughWriter(ref mut w) => w.flush(),
ChunkedWriter(ref mut w) => w.flush(), ChunkedWriter(ref mut w) => w.flush(),
@@ -345,24 +361,36 @@ pub fn is_token(b: u8) -> bool {
/// otherwise returns any error encountered reading the stream. /// otherwise returns any error encountered reading the stream.
/// ///
/// The remaining contents of `buf` are left untouched. /// The remaining contents of `buf` are left untouched.
fn read_token_until_space<R: Reader>(stream: &mut R, buf: &mut [u8]) -> HttpResult<bool> { fn read_method_token_until_space<R: Read>(stream: &mut R, buf: &mut [u8]) -> HttpResult<bool> {
use std::old_io::BufWriter; macro_rules! byte (
let mut bufwrt = BufWriter::new(buf); ($rdr:ident) => ({
let mut slot = [0];
match try!($rdr.read(&mut slot)) {
1 => slot[0],
_ => return Err(HttpMethodError),
}
})
);
let mut cursor = Cursor::new(buf);
loop { loop {
let byte = try!(stream.read_byte()); let b = byte!(stream);
if byte == SP { if b == SP {
break; break;
} else if !is_token(byte) { } else if !is_token(b) {
return Err(HttpMethodError); return Err(HttpMethodError);
// Read to end but there's still more // Read to end but there's still more
} else if bufwrt.write_u8(byte).is_err() { } else {
return Ok(false); match cursor.write(&[b]) {
Ok(1) => (),
_ => return Ok(false)
}
} }
} }
if bufwrt.tell().unwrap() == 0 { if cursor.position() == 0 {
return Err(HttpMethodError); return Err(HttpMethodError);
} }
@@ -372,10 +400,10 @@ fn read_token_until_space<R: Reader>(stream: &mut R, buf: &mut [u8]) -> HttpResu
/// Read a `Method` from a raw stream, such as `GET`. /// Read a `Method` from a raw stream, such as `GET`.
/// ### Note: /// ### Note:
/// Extension methods are only parsed to 16 characters. /// Extension methods are only parsed to 16 characters.
pub fn read_method<R: Reader>(stream: &mut R) -> HttpResult<method::Method> { pub fn read_method<R: Read>(stream: &mut R) -> HttpResult<method::Method> {
let mut buf = [SP; 16]; let mut buf = [SP; 16];
if !try!(read_token_until_space(stream, &mut buf)) { if !try!(read_method_token_until_space(stream, &mut buf)) {
return Err(HttpMethodError); return Err(HttpMethodError);
} }
@@ -404,20 +432,29 @@ pub fn read_method<R: Reader>(stream: &mut R) -> HttpResult<method::Method> {
} }
/// Read a `RequestUri` from a raw stream. /// Read a `RequestUri` from a raw stream.
pub fn read_uri<R: Reader>(stream: &mut R) -> HttpResult<uri::RequestUri> { pub fn read_uri<R: Read>(stream: &mut R) -> HttpResult<uri::RequestUri> {
let mut b = try!(stream.read_byte()); macro_rules! byte (
($rdr:ident) => ({
let mut buf = [0];
match try!($rdr.read(&mut buf)) {
1 => buf[0],
_ => return Err(HttpUriError(UrlError::InvalidCharacter)),
}
})
);
let mut b = byte!(stream);
while b == SP { while b == SP {
b = try!(stream.read_byte()); b = byte!(stream);
} }
let mut s = String::new(); let mut s = String::new();
if b == STAR { if b == STAR {
try!(expect(stream.read_byte(), SP)); try!(expect(byte!(stream), SP));
return Ok(Star) return Ok(Star)
} else { } else {
s.push(b as char); s.push(b as char);
loop { loop {
match try!(stream.read_byte()) { match byte!(stream) {
SP => { SP => {
break; break;
}, },
@@ -448,32 +485,37 @@ pub fn read_uri<R: Reader>(stream: &mut R) -> HttpResult<uri::RequestUri> {
/// Read the `HttpVersion` from a raw stream, such as `HTTP/1.1`. /// Read the `HttpVersion` from a raw stream, such as `HTTP/1.1`.
pub fn read_http_version<R: Reader>(stream: &mut R) -> HttpResult<HttpVersion> { pub fn read_http_version<R: Read>(stream: &mut R) -> HttpResult<HttpVersion> {
try!(expect(stream.read_byte(), b'H')); macro_rules! byte (
try!(expect(stream.read_byte(), b'T')); ($rdr:ident) => ({
try!(expect(stream.read_byte(), b'T')); let mut buf = [0];
try!(expect(stream.read_byte(), b'P')); match try!($rdr.read(&mut buf)) {
try!(expect(stream.read_byte(), b'/')); 1 => buf[0],
_ => return Err(HttpVersionError),
}
})
);
match try!(stream.read_byte()) { try!(expect(byte!(stream), b'H'));
try!(expect(byte!(stream), b'T'));
try!(expect(byte!(stream), b'T'));
try!(expect(byte!(stream), b'P'));
try!(expect(byte!(stream), b'/'));
match byte!(stream) {
b'0' => { b'0' => {
try!(expect(stream.read_byte(), b'.')); try!(expect(byte!(stream), b'.'));
try!(expect(stream.read_byte(), b'9')); try!(expect(byte!(stream), b'9'));
Ok(Http09) Ok(Http09)
}, },
b'1' => { b'1' => {
try!(expect(stream.read_byte(), b'.')); try!(expect(byte!(stream), b'.'));
match try!(stream.read_byte()) { match byte!(stream) {
b'0' => Ok(Http10), b'0' => Ok(Http10),
b'1' => Ok(Http11), b'1' => Ok(Http11),
_ => Err(HttpVersionError) _ => Err(HttpVersionError)
} }
}, },
b'2' => {
try!(expect(stream.read_byte(), b'.'));
try!(expect(stream.read_byte(), b'0'));
Ok(Http20)
},
_ => Err(HttpVersionError) _ => Err(HttpVersionError)
} }
} }
@@ -507,14 +549,24 @@ pub type RawHeaderLine = (String, Vec<u8>);
/// > ; obsolete line folding /// > ; obsolete line folding
/// > ; see Section 3.2.4 /// > ; see Section 3.2.4
/// > ``` /// > ```
pub fn read_header<R: Reader>(stream: &mut R) -> HttpResult<Option<RawHeaderLine>> { pub fn read_header<R: Read>(stream: &mut R) -> HttpResult<Option<RawHeaderLine>> {
macro_rules! byte (
($rdr:ident) => ({
let mut buf = [0];
match try!($rdr.read(&mut buf)) {
1 => buf[0],
_ => return Err(HttpHeaderError),
}
})
);
let mut name = String::new(); let mut name = String::new();
let mut value = vec![]; let mut value = vec![];
loop { loop {
match try!(stream.read_byte()) { match byte!(stream) {
CR if name.len() == 0 => { CR if name.len() == 0 => {
match try!(stream.read_byte()) { match byte!(stream) {
LF => return Ok(None), LF => return Ok(None),
_ => return Err(HttpHeaderError) _ => return Err(HttpHeaderError)
} }
@@ -534,7 +586,7 @@ pub fn read_header<R: Reader>(stream: &mut R) -> HttpResult<Option<RawHeaderLine
todo!("handle obs-folding (gross!)"); todo!("handle obs-folding (gross!)");
loop { loop {
match try!(stream.read_byte()) { match byte!(stream) {
CR => break, CR => break,
LF => return Err(HttpHeaderError), LF => return Err(HttpHeaderError),
b' ' if ows => {}, b' ' if ows => {},
@@ -549,7 +601,7 @@ pub fn read_header<R: Reader>(stream: &mut R) -> HttpResult<Option<RawHeaderLine
let real_len = value.len() - value.iter().rev().take_while(|&&x| b' ' == x).count(); let real_len = value.len() - value.iter().rev().take_while(|&&x| b' ' == x).count();
value.truncate(real_len); value.truncate(real_len);
match try!(stream.read_byte()) { match byte!(stream) {
LF => Ok(Some((name, value))), LF => Ok(Some((name, value))),
_ => Err(HttpHeaderError) _ => Err(HttpHeaderError)
} }
@@ -560,7 +612,17 @@ pub fn read_header<R: Reader>(stream: &mut R) -> HttpResult<Option<RawHeaderLine
pub type RequestLine = (method::Method, uri::RequestUri, HttpVersion); pub type RequestLine = (method::Method, uri::RequestUri, HttpVersion);
/// Read the `RequestLine`, such as `GET / HTTP/1.1`. /// Read the `RequestLine`, such as `GET / HTTP/1.1`.
pub fn read_request_line<R: Reader>(stream: &mut R) -> HttpResult<RequestLine> { pub fn read_request_line<R: Read>(stream: &mut R) -> HttpResult<RequestLine> {
macro_rules! byte (
($rdr:ident) => ({
let mut buf = [0];
match try!($rdr.read(&mut buf)) {
1 => buf[0],
_ => return Err(HttpVersionError),
}
})
);
debug!("read request line"); debug!("read request line");
let method = try!(read_method(stream)); let method = try!(read_method(stream));
debug!("method = {:?}", method); debug!("method = {:?}", method);
@@ -569,10 +631,10 @@ pub fn read_request_line<R: Reader>(stream: &mut R) -> HttpResult<RequestLine> {
let version = try!(read_http_version(stream)); let version = try!(read_http_version(stream));
debug!("version = {:?}", version); debug!("version = {:?}", version);
if try!(stream.read_byte()) != CR { if byte!(stream) != CR {
return Err(HttpVersionError); return Err(HttpVersionError);
} }
if try!(stream.read_byte()) != LF { if byte!(stream) != LF {
return Err(HttpVersionError); return Err(HttpVersionError);
} }
@@ -606,9 +668,19 @@ impl Clone for RawStatus {
/// > status-code = 3DIGIT /// > status-code = 3DIGIT
/// > reason-phrase = *( HTAB / SP / VCHAR / obs-text ) /// > reason-phrase = *( HTAB / SP / VCHAR / obs-text )
/// >``` /// >```
pub fn read_status_line<R: Reader>(stream: &mut R) -> HttpResult<StatusLine> { pub fn read_status_line<R: Read>(stream: &mut R) -> HttpResult<StatusLine> {
macro_rules! byte (
($rdr:ident) => ({
let mut buf = [0];
match try!($rdr.read(&mut buf)) {
1 => buf[0],
_ => return Err(HttpVersionError),
}
})
);
let version = try!(read_http_version(stream)); let version = try!(read_http_version(stream));
if try!(stream.read_byte()) != SP { if byte!(stream) != SP {
return Err(HttpVersionError); return Err(HttpVersionError);
} }
let code = try!(read_status(stream)); let code = try!(read_status(stream));
@@ -617,11 +689,21 @@ pub fn read_status_line<R: Reader>(stream: &mut R) -> HttpResult<StatusLine> {
} }
/// Read the StatusCode from a stream. /// Read the StatusCode from a stream.
pub fn read_status<R: Reader>(stream: &mut R) -> HttpResult<RawStatus> { pub fn read_status<R: Read>(stream: &mut R) -> HttpResult<RawStatus> {
macro_rules! byte (
($rdr:ident) => ({
let mut buf = [0];
match try!($rdr.read(&mut buf)) {
1 => buf[0],
_ => return Err(HttpStatusError),
}
})
);
let code = [ let code = [
try!(stream.read_byte()), byte!(stream),
try!(stream.read_byte()), byte!(stream),
try!(stream.read_byte()), byte!(stream),
]; ];
let code = match str::from_utf8(code.as_slice()).ok().and_then(|x| x.parse().ok()) { let code = match str::from_utf8(code.as_slice()).ok().and_then(|x| x.parse().ok()) {
@@ -629,27 +711,25 @@ pub fn read_status<R: Reader>(stream: &mut R) -> HttpResult<RawStatus> {
None => return Err(HttpStatusError) None => return Err(HttpStatusError)
}; };
match try!(stream.read_byte()) { match byte!(stream) {
b' ' => (), b' ' => (),
_ => return Err(HttpStatusError) _ => return Err(HttpStatusError)
} }
let mut buf = [b' '; 32]; let mut buf = [SP; 32];
let mut cursor = Cursor::new(&mut buf[..]);
{ {
let mut bufwrt = BufWriter::new(&mut buf);
'read: loop { 'read: loop {
match try!(stream.read_byte()) { match byte!(stream) {
CR => match try!(stream.read_byte()) { CR => match byte!(stream) {
LF => break, LF => break,
_ => return Err(HttpStatusError) _ => return Err(HttpStatusError)
}, },
b => match bufwrt.write_u8(b) { b => match cursor.write(&[b]) {
Ok(_) => (), Ok(0) | Err(_) => {
Err(_) => {
for _ in 0u8..128 { for _ in 0u8..128 {
match try!(stream.read_byte()) { match byte!(stream) {
CR => match try!(stream.read_byte()) { CR => match byte!(stream) {
LF => break 'read, LF => break 'read,
_ => return Err(HttpStatusError) _ => return Err(HttpStatusError)
}, },
@@ -658,12 +738,13 @@ pub fn read_status<R: Reader>(stream: &mut R) -> HttpResult<RawStatus> {
} }
return Err(HttpStatusError) return Err(HttpStatusError)
} }
Ok(_) => (),
} }
} }
} }
} }
let reason = match str::from_utf8(&buf[..]) { let reason = match str::from_utf8(cursor.into_inner()) {
Ok(s) => s.trim(), Ok(s) => s.trim(),
Err(_) => return Err(HttpStatusError) Err(_) => return Err(HttpStatusError)
}; };
@@ -686,39 +767,34 @@ pub fn read_status<R: Reader>(stream: &mut R) -> HttpResult<RawStatus> {
} }
#[inline] #[inline]
fn expect(r: IoResult<u8>, expected: u8) -> HttpResult<()> { fn expect(actual: u8, expected: u8) -> HttpResult<()> {
match r { if actual == expected {
Ok(b) if b == expected => Ok(()), Ok(())
Ok(_) => Err(HttpVersionError), } else {
Err(e) => Err(HttpIoError(e)) Err(HttpVersionError)
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::old_io::{self, MemReader, MemWriter, IoResult}; use std::io::{self, Write};
use std::borrow::Cow::{Borrowed, Owned}; use std::borrow::Cow::{Borrowed, Owned};
use test::Bencher; use test::Bencher;
use uri::RequestUri; use uri::RequestUri;
use uri::RequestUri::{Star, AbsoluteUri, AbsolutePath, Authority}; use uri::RequestUri::{Star, AbsoluteUri, AbsolutePath, Authority};
use method; use method;
use version::HttpVersion; use version::HttpVersion;
use version::HttpVersion::{Http10, Http11, Http20}; use version::HttpVersion::{Http10, Http11};
use HttpError::{HttpVersionError, HttpMethodError}; use HttpError::{HttpVersionError, HttpMethodError};
use HttpResult; use HttpResult;
use url::Url; use url::Url;
use super::{read_method, read_uri, read_http_version, read_header, use super::{read_method, read_uri, read_http_version, read_header,
RawHeaderLine, read_status, RawStatus, read_chunk_size}; RawHeaderLine, read_status, RawStatus, read_chunk_size};
fn mem(s: &str) -> MemReader {
MemReader::new(s.as_bytes().to_vec())
}
#[test] #[test]
fn test_read_method() { fn test_read_method() {
fn read(s: &str, result: HttpResult<method::Method>) { fn read(s: &str, result: HttpResult<method::Method>) {
assert_eq!(read_method(&mut mem(s)), result); assert_eq!(read_method(&mut s.as_bytes()), result);
} }
read("GET /", Ok(method::Method::Get)); read("GET /", Ok(method::Method::Get));
@@ -737,7 +813,7 @@ mod tests {
#[test] #[test]
fn test_read_uri() { fn test_read_uri() {
fn read(s: &str, result: HttpResult<RequestUri>) { fn read(s: &str, result: HttpResult<RequestUri>) {
assert_eq!(read_uri(&mut mem(s)), result); assert_eq!(read_uri(&mut s.as_bytes()), result);
} }
read("* ", Ok(Star)); read("* ", Ok(Star));
@@ -749,12 +825,11 @@ mod tests {
#[test] #[test]
fn test_read_http_version() { fn test_read_http_version() {
fn read(s: &str, result: HttpResult<HttpVersion>) { fn read(s: &str, result: HttpResult<HttpVersion>) {
assert_eq!(read_http_version(&mut mem(s)), result); assert_eq!(read_http_version(&mut s.as_bytes()), result);
} }
read("HTTP/1.0", Ok(Http10)); read("HTTP/1.0", Ok(Http10));
read("HTTP/1.1", Ok(Http11)); read("HTTP/1.1", Ok(Http11));
read("HTTP/2.0", Ok(Http20));
read("HTP/2.0", Err(HttpVersionError)); read("HTP/2.0", Err(HttpVersionError));
read("HTTP.2.0", Err(HttpVersionError)); read("HTTP.2.0", Err(HttpVersionError));
read("HTTP 2.0", Err(HttpVersionError)); read("HTTP 2.0", Err(HttpVersionError));
@@ -764,11 +839,11 @@ mod tests {
#[test] #[test]
fn test_read_status() { fn test_read_status() {
fn read(s: &str, result: HttpResult<RawStatus>) { fn read(s: &str, result: HttpResult<RawStatus>) {
assert_eq!(read_status(&mut mem(s)), result); assert_eq!(read_status(&mut s.as_bytes()), result);
} }
fn read_ignore_string(s: &str, result: HttpResult<RawStatus>) { fn read_ignore_string(s: &str, result: HttpResult<RawStatus>) {
match (read_status(&mut mem(s)), result) { match (read_status(&mut s.as_bytes()), result) {
(Ok(RawStatus(ref c1, _)), Ok(RawStatus(ref c2, _))) => { (Ok(RawStatus(ref c1, _)), Ok(RawStatus(ref c2, _))) => {
assert_eq!(c1, c2); assert_eq!(c1, c2);
}, },
@@ -788,7 +863,7 @@ mod tests {
#[test] #[test]
fn test_read_header() { fn test_read_header() {
fn read(s: &str, result: HttpResult<Option<RawHeaderLine>>) { fn read(s: &str, result: HttpResult<Option<RawHeaderLine>>) {
assert_eq!(read_header(&mut mem(s)), result); assert_eq!(read_header(&mut s.as_bytes()), result);
} }
read("Host: rust-lang.org\r\n", Ok(Some(("Host".to_string(), read("Host: rust-lang.org\r\n", Ok(Some(("Host".to_string(),
@@ -798,10 +873,10 @@ mod tests {
#[test] #[test]
fn test_write_chunked() { fn test_write_chunked() {
use std::str::from_utf8; use std::str::from_utf8;
let mut w = super::HttpWriter::ChunkedWriter(MemWriter::new()); let mut w = super::HttpWriter::ChunkedWriter(Vec::new());
w.write_all(b"foo bar").unwrap(); w.write_all(b"foo bar").unwrap();
w.write_all(b"baz quux herp").unwrap(); w.write_all(b"baz quux herp").unwrap();
let buf = w.end().unwrap().into_inner(); let buf = w.end().unwrap();
let s = from_utf8(buf.as_slice()).unwrap(); let s = from_utf8(buf.as_slice()).unwrap();
assert_eq!(s, "7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n0\r\n\r\n"); assert_eq!(s, "7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n0\r\n\r\n");
} }
@@ -809,19 +884,23 @@ mod tests {
#[test] #[test]
fn test_write_sized() { fn test_write_sized() {
use std::str::from_utf8; use std::str::from_utf8;
let mut w = super::HttpWriter::SizedWriter(MemWriter::new(), 8); let mut w = super::HttpWriter::SizedWriter(Vec::new(), 8);
w.write_all(b"foo bar").unwrap(); w.write_all(b"foo bar").unwrap();
assert_eq!(w.write_all(b"baz"), Err(old_io::standard_error(old_io::ShortWrite(1)))); assert_eq!(w.write(b"baz"), Ok(1));
let buf = w.end().unwrap().into_inner(); let buf = w.end().unwrap();
let s = from_utf8(buf.as_slice()).unwrap(); let s = from_utf8(buf.as_slice()).unwrap();
assert_eq!(s, "foo barb"); assert_eq!(s, "foo barb");
} }
#[test] #[test]
fn test_read_chunk_size() { fn test_read_chunk_size() {
fn read(s: &str, result: IoResult<u64>) { fn read(s: &str, result: io::Result<u64>) {
assert_eq!(read_chunk_size(&mut mem(s)), result); assert_eq!(read_chunk_size(&mut s.as_bytes()), result);
}
fn read_err(s: &str) {
assert_eq!(read_chunk_size(&mut s.as_bytes()).unwrap_err().kind(), io::ErrorKind::InvalidInput);
} }
read("1\r\n", Ok(1)); read("1\r\n", Ok(1));
@@ -833,13 +912,13 @@ mod tests {
read("Ff\r\n", Ok(255)); read("Ff\r\n", Ok(255));
read("Ff \r\n", Ok(255)); read("Ff \r\n", Ok(255));
// Missing LF or CRLF // Missing LF or CRLF
read("F\rF", Err(old_io::standard_error(old_io::InvalidInput))); read_err("F\rF");
read("F", Err(old_io::standard_error(old_io::EndOfFile))); read_err("F");
// Invalid hex digit // Invalid hex digit
read("X\r\n", Err(old_io::standard_error(old_io::InvalidInput))); read_err("X\r\n");
read("1X\r\n", Err(old_io::standard_error(old_io::InvalidInput))); read_err("1X\r\n");
read("-\r\n", Err(old_io::standard_error(old_io::InvalidInput))); read_err("-\r\n");
read("-1\r\n", Err(old_io::standard_error(old_io::InvalidInput))); read_err("-1\r\n");
// Acceptable (if not fully valid) extensions do not influence the size // Acceptable (if not fully valid) extensions do not influence the size
read("1;extension\r\n", Ok(1)); read("1;extension\r\n", Ok(1));
read("a;ext name=value\r\n", Ok(10)); read("a;ext name=value\r\n", Ok(10));
@@ -850,21 +929,21 @@ mod tests {
read("3 ;\r\n", Ok(3)); read("3 ;\r\n", Ok(3));
read("3 ; \r\n", Ok(3)); read("3 ; \r\n", Ok(3));
// Invalid extensions cause an error // Invalid extensions cause an error
read("1 invalid extension\r\n", Err(old_io::standard_error(old_io::InvalidInput))); read_err("1 invalid extension\r\n");
read("1 A\r\n", Err(old_io::standard_error(old_io::InvalidInput))); read_err("1 A\r\n");
read("1;no CRLF", Err(old_io::standard_error(old_io::EndOfFile))); read_err("1;no CRLF");
} }
#[bench] #[bench]
fn bench_read_method(b: &mut Bencher) { fn bench_read_method(b: &mut Bencher) {
b.bytes = b"CONNECT ".len() as u64; b.bytes = b"CONNECT ".len() as u64;
b.iter(|| assert_eq!(read_method(&mut mem("CONNECT ")), Ok(method::Method::Connect))); b.iter(|| assert_eq!(read_method(&mut b"CONNECT "), Ok(method::Method::Connect)));
} }
#[bench] #[bench]
fn bench_read_status(b: &mut Bencher) { fn bench_read_status(b: &mut Bencher) {
b.bytes = b"404 Not Found\r\n".len() as u64; b.bytes = b"404 Not Found\r\n".len() as u64;
b.iter(|| assert_eq!(read_status(&mut mem("404 Not Found\r\n")), Ok(RawStatus(404, Borrowed("Not Found"))))); b.iter(|| assert_eq!(read_status(&mut b"404 Not Found\r\n"), Ok(RawStatus(404, Borrowed("Not Found")))));
} }
} }

View File

@@ -1,6 +1,6 @@
#![feature(core, collections, io, old_io, os, old_path, #![feature(core, collections, io, net, os, path,
std_misc, box_syntax, unsafe_destructor)] std_misc, box_syntax, unsafe_destructor)]
#![deny(missing_docs)] #![cfg_attr(test, deny(missing_docs))]
#![cfg_attr(test, deny(warnings))] #![cfg_attr(test, deny(warnings))]
#![cfg_attr(test, feature(alloc, test))] #![cfg_attr(test, feature(alloc, test))]
@@ -140,7 +140,7 @@ extern crate log;
#[cfg(test)] #[cfg(test)]
extern crate test; extern crate test;
pub use std::old_io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr, Port};
pub use mimewrapper::mime; pub use mimewrapper::mime;
pub use url::Url; pub use url::Url;
pub use client::Client; pub use client::Client;
@@ -150,7 +150,7 @@ pub use server::Server;
use std::error::{Error, FromError}; use std::error::{Error, FromError};
use std::fmt; use std::fmt;
use std::old_io::IoError; use std::io::Error as IoError;
use self::HttpError::{HttpMethodError, HttpUriError, HttpVersionError, use self::HttpError::{HttpMethodError, HttpUriError, HttpVersionError,
HttpHeaderError, HttpStatusError, HttpIoError}; HttpHeaderError, HttpStatusError, HttpIoError};
@@ -164,7 +164,7 @@ macro_rules! todo(
macro_rules! inspect( macro_rules! inspect(
($name:expr, $value:expr) => ({ ($name:expr, $value:expr) => ({
let v = $value; let v = $value;
debug!("inspect: {:?} = {:?}", $name, v); trace!("inspect: {:?} = {:?}", $name, v);
v v
}) })
); );

View File

@@ -1,67 +1,69 @@
use std::fmt; use std::fmt;
use std::old_io::{IoResult, MemReader, MemWriter}; use std::io::{self, Read, Write, Cursor};
use std::old_io::net::ip::SocketAddr; use std::net::SocketAddr;
use net::{NetworkStream, NetworkConnector}; use net::{NetworkStream, NetworkConnector};
pub struct MockStream { pub struct MockStream {
pub read: MemReader, pub read: Cursor<Vec<u8>>,
pub write: MemWriter, pub write: Vec<u8>,
} }
impl Clone for MockStream { impl Clone for MockStream {
fn clone(&self) -> MockStream { fn clone(&self) -> MockStream {
MockStream { MockStream {
read: MemReader::new(self.read.get_ref().to_vec()), read: Cursor::new(self.read.get_ref().clone()),
write: MemWriter::from_vec(self.write.get_ref().to_vec()), write: self.write.clone()
} }
} }
} }
impl PartialEq for MockStream {
fn eq(&self, other: &MockStream) -> bool {
self.read.get_ref() == other.read.get_ref() &&
self.write.get_ref() == other.write.get_ref()
}
}
impl fmt::Debug for MockStream { impl fmt::Debug for MockStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MockStream {{ read: {:?}, write: {:?} }}", write!(f, "MockStream {{ read: {:?}, write: {:?} }}", self.read.get_ref(), self.write)
self.read.get_ref(), self.write.get_ref())
} }
}
impl PartialEq for MockStream {
fn eq(&self, other: &MockStream) -> bool {
self.read.get_ref() == other.read.get_ref() && self.write == other.write
}
} }
impl MockStream { impl MockStream {
pub fn new() -> MockStream { pub fn new() -> MockStream {
MockStream { MockStream {
read: MemReader::new(vec![]), read: Cursor::new(vec![]),
write: MemWriter::new(), write: vec![],
} }
} }
pub fn with_input(input: &[u8]) -> MockStream { pub fn with_input(input: &[u8]) -> MockStream {
MockStream { MockStream {
read: MemReader::new(input.to_vec()), read: Cursor::new(input.to_vec()),
write: MemWriter::new(), write: vec![]
} }
} }
} }
impl Reader for MockStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { impl Read for MockStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.read.read(buf) self.read.read(buf)
} }
} }
impl Writer for MockStream { impl Write for MockStream {
fn write_all(&mut self, msg: &[u8]) -> IoResult<()> { fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
self.write.write_all(msg) Write::write(&mut self.write, msg)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
} }
} }
impl NetworkStream for MockStream { impl NetworkStream for MockStream {
fn peer_name(&mut self) -> IoResult<SocketAddr> { fn peer_addr(&mut self) -> io::Result<SocketAddr> {
Ok("127.0.0.1:1337".parse().unwrap()) Ok("127.0.0.1:1337".parse().unwrap())
} }
} }
@@ -71,7 +73,7 @@ pub struct MockConnector;
impl NetworkConnector for MockConnector { impl NetworkConnector for MockConnector {
type Stream = MockStream; type Stream = MockStream;
fn connect(&mut self, _host: &str, _port: u16, _scheme: &str) -> IoResult<MockStream> { fn connect(&mut self, _host: &str, _port: u16, _scheme: &str) -> io::Result<MockStream> {
Ok(MockStream::new()) Ok(MockStream::new())
} }
} }
@@ -86,8 +88,9 @@ macro_rules! mock_connector (
impl ::net::NetworkConnector for $name { impl ::net::NetworkConnector for $name {
type Stream = ::mock::MockStream; type Stream = ::mock::MockStream;
fn connect(&mut self, host: &str, port: u16, scheme: &str) -> ::std::old_io::IoResult<::mock::MockStream> { fn connect(&mut self, host: &str, port: u16, scheme: &str) -> ::std::io::Result<::mock::MockStream> {
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Cursor;
debug!("MockStream::connect({:?}, {:?}, {:?})", host, port, scheme); debug!("MockStream::connect({:?}, {:?}, {:?})", host, port, scheme);
let mut map = HashMap::new(); let mut map = HashMap::new();
$(map.insert($url, $res);)* $(map.insert($url, $res);)*
@@ -97,8 +100,8 @@ macro_rules! mock_connector (
// ignore port for now // ignore port for now
match map.get(&*key) { match map.get(&*key) {
Some(res) => Ok(::mock::MockStream { Some(res) => Ok(::mock::MockStream {
write: ::std::old_io::MemWriter::new(), write: vec![],
read: ::std::old_io::MemReader::new(res.to_string().into_bytes()) read: Cursor::new(res.to_string().into_bytes()),
}), }),
None => panic!("{:?} doesn't know url {}", stringify!($name), key) None => panic!("{:?} doesn't know url {}", stringify!($name), key)
} }

View File

@@ -1,11 +1,10 @@
//! A collection of traits abstracting over Listeners and Streams. //! A collection of traits abstracting over Listeners and Streams.
use std::any::{Any, TypeId}; use std::any::{Any, TypeId};
use std::fmt; use std::fmt;
use std::old_io::{IoResult, IoError, ConnectionAborted, InvalidInput, OtherIoError, use std::io::{self, Read, Write};
Stream, Listener, Acceptor}; use std::net::{SocketAddr, ToSocketAddrs, TcpStream, TcpListener};
use std::old_io::net::ip::{SocketAddr, ToSocketAddr, Port};
use std::old_io::net::tcp::{TcpStream, TcpListener, TcpAcceptor};
use std::mem; use std::mem;
use std::path::Path;
use std::raw::{self, TraitObject}; use std::raw::{self, TraitObject};
use std::sync::Arc; use std::sync::Arc;
@@ -24,34 +23,26 @@ macro_rules! try_some {
} }
/// The write-status indicating headers have not been written. /// The write-status indicating headers have not been written.
#[allow(missing_copy_implementations)] pub enum Fresh {}
pub struct Fresh;
/// The write-status indicating headers have been written. /// The write-status indicating headers have been written.
#[allow(missing_copy_implementations)] pub enum Streaming {}
pub struct Streaming;
/// An abstraction to listen for connections on a certain port. /// An abstraction to listen for connections on a certain port.
pub trait NetworkListener { pub trait NetworkListener: Clone {
/// Type of Acceptor /// The stream produced for each connection.
type Acceptor: NetworkAcceptor;
/// Listens on a socket.
fn listen<To: ToSocketAddr>(&mut self, addr: To) -> IoResult<Self::Acceptor>;
}
/// An abstraction to receive `NetworkStream`s.
pub trait NetworkAcceptor: Clone + Send {
/// Type of Stream to receive
type Stream: NetworkStream + Send + Clone; type Stream: NetworkStream + Send + Clone;
/// Listens on a socket.
//fn listen<To: ToSocketAddrs>(&mut self, addr: To) -> io::Result<Self::Acceptor>;
/// Returns an iterator of streams. /// Returns an iterator of streams.
fn accept(&mut self) -> IoResult<Self::Stream>; fn accept(&mut self) -> io::Result<Self::Stream>;
/// Get the address this Listener ended up listening on. /// Get the address this Listener ended up listening on.
fn socket_name(&self) -> IoResult<SocketAddr>; fn socket_addr(&mut self) -> io::Result<SocketAddr>;
/// Closes the Acceptor, so no more incoming connections will be handled. /// Closes the Acceptor, so no more incoming connections will be handled.
fn close(&mut self) -> IoResult<()>; // fn close(&mut self) -> io::Result<()>;
/// Returns an iterator over incoming connections. /// Returns an iterator over incoming connections.
fn incoming(&mut self) -> NetworkConnections<Self> { fn incoming(&mut self) -> NetworkConnections<Self> {
@@ -60,20 +51,20 @@ pub trait NetworkAcceptor: Clone + Send {
} }
/// An iterator wrapper over a NetworkAcceptor. /// An iterator wrapper over a NetworkAcceptor.
pub struct NetworkConnections<'a, N: NetworkAcceptor + 'a>(&'a mut N); pub struct NetworkConnections<'a, N: NetworkListener + 'a>(&'a mut N);
impl<'a, N: NetworkAcceptor> Iterator for NetworkConnections<'a, N> { impl<'a, N: NetworkListener + 'a> Iterator for NetworkConnections<'a, N> {
type Item = IoResult<N::Stream>; type Item = io::Result<N::Stream>;
fn next(&mut self) -> Option<IoResult<N::Stream>> { fn next(&mut self) -> Option<io::Result<N::Stream>> {
Some(self.0.accept()) Some(self.0.accept())
} }
} }
/// An abstraction over streams that a Server can utilize. /// An abstraction over streams that a Server can utilize.
pub trait NetworkStream: Stream + Any + StreamClone + Send { pub trait NetworkStream: Read + Write + Any + StreamClone + Send {
/// Get the remote address of the underlying connection. /// Get the remote address of the underlying connection.
fn peer_name(&mut self) -> IoResult<SocketAddr>; fn peer_addr(&mut self) -> io::Result<SocketAddr>;
} }
@@ -94,7 +85,7 @@ pub trait NetworkConnector {
/// Type of Stream to create /// Type of Stream to create
type Stream: NetworkStream + Send; type Stream: NetworkStream + Send;
/// Connect to a remote address. /// Connect to a remote address.
fn connect(&mut self, host: &str, port: Port, scheme: &str) -> IoResult<Self::Stream>; fn connect(&mut self, host: &str, port: u16, scheme: &str) -> io::Result<Self::Stream>;
} }
impl fmt::Debug for Box<NetworkStream + Send> { impl fmt::Debug for Box<NetworkStream + Send> {
@@ -108,32 +99,6 @@ impl Clone for Box<NetworkStream + Send> {
fn clone(&self) -> Box<NetworkStream + Send> { self.clone_box() } fn clone(&self) -> Box<NetworkStream + Send> { self.clone_box() }
} }
impl Reader for Box<NetworkStream + Send> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { (**self).read(buf) }
}
impl Writer for Box<NetworkStream + Send> {
#[inline]
fn write_all(&mut self, msg: &[u8]) -> IoResult<()> { (**self).write_all(msg) }
#[inline]
fn flush(&mut self) -> IoResult<()> { (**self).flush() }
}
impl<'a> Reader for &'a mut NetworkStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { (**self).read(buf) }
}
impl<'a> Writer for &'a mut NetworkStream {
#[inline]
fn write_all(&mut self, msg: &[u8]) -> IoResult<()> { (**self).write_all(msg) }
#[inline]
fn flush(&mut self) -> IoResult<()> { (**self).flush() }
}
impl UnsafeAnyExt for NetworkStream { impl UnsafeAnyExt for NetworkStream {
unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T { unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
mem::transmute(mem::transmute::<&NetworkStream, mem::transmute(mem::transmute::<&NetworkStream,
@@ -191,63 +156,57 @@ impl NetworkStream {
} }
/// A `NetworkListener` for `HttpStream`s. /// A `NetworkListener` for `HttpStream`s.
#[allow(missing_copy_implementations)]
pub enum HttpListener { pub enum HttpListener {
/// Http variant. /// Http variant.
Http, Http(TcpListener),
/// Https variant. The two paths point to the certificate and key PEM files, in that order. /// Https variant. The two paths point to the certificate and key PEM files, in that order.
Https(Path, Path), Https(TcpListener, Arc<SslContext>)
} }
impl NetworkListener for HttpListener { impl Clone for HttpListener {
type Acceptor = HttpAcceptor; fn clone(&self) -> HttpListener {
match *self {
#[inline] HttpListener::Http(ref tcp) => HttpListener::Http(tcp.try_clone().unwrap()),
fn listen<To: ToSocketAddr>(&mut self, addr: To) -> IoResult<HttpAcceptor> { HttpListener::Https(ref tcp, ref ssl) => HttpListener::Https(tcp.try_clone().unwrap(), ssl.clone()),
let mut tcp = try!(TcpListener::bind(addr)); }
let addr = try!(tcp.socket_name());
Ok(match *self {
HttpListener::Http => HttpAcceptor::Http(try!(tcp.listen()), addr),
HttpListener::Https(ref cert, ref key) => {
let mut ssl_context = try!(SslContext::new(Sslv23).map_err(lift_ssl_error));
try_some!(ssl_context.set_cipher_list("DEFAULT").map(lift_ssl_error));
try_some!(ssl_context.set_certificate_file(
cert, X509FileType::PEM).map(lift_ssl_error));
try_some!(ssl_context.set_private_key_file(
key, X509FileType::PEM).map(lift_ssl_error));
ssl_context.set_verify(SslVerifyNone, None);
HttpAcceptor::Https(try!(tcp.listen()), addr, Arc::new(ssl_context))
}
})
} }
} }
/// A `NetworkAcceptor` for `HttpStream`s. impl HttpListener {
#[derive(Clone)]
pub enum HttpAcceptor { /// Start listening to an address over HTTP.
/// Http variant. pub fn http<To: ToSocketAddrs>(addr: &To) -> io::Result<HttpListener> {
Http(TcpAcceptor, SocketAddr), Ok(HttpListener::Http(try!(TcpListener::bind(addr))))
/// Https variant. }
Https(TcpAcceptor, SocketAddr, Arc<SslContext>),
/// Start listening to an address over HTTPS.
pub fn https<To: ToSocketAddrs>(addr: &To, cert: &Path, key: &Path) -> io::Result<HttpListener> {
let mut ssl_context = try!(SslContext::new(Sslv23).map_err(lift_ssl_error));
try_some!(ssl_context.set_cipher_list("DEFAULT").map(lift_ssl_error));
try_some!(ssl_context.set_certificate_file(
cert, X509FileType::PEM).map(lift_ssl_error));
try_some!(ssl_context.set_private_key_file(
key, X509FileType::PEM).map(lift_ssl_error));
ssl_context.set_verify(SslVerifyNone, None);
Ok(HttpListener::Https(try!(TcpListener::bind(addr)), Arc::new(ssl_context)))
}
} }
impl NetworkAcceptor for HttpAcceptor { impl NetworkListener for HttpListener {
type Stream = HttpStream; type Stream = HttpStream;
#[inline] #[inline]
fn accept(&mut self) -> IoResult<HttpStream> { fn accept(&mut self) -> io::Result<HttpStream> {
Ok(match *self { Ok(match *self {
HttpAcceptor::Http(ref mut tcp, _) => HttpStream::Http(try!(tcp.accept())), HttpListener::Http(ref mut tcp) => HttpStream::Http(CloneTcpStream(try!(tcp.accept()).0)),
HttpAcceptor::Https(ref mut tcp, _, ref ssl_context) => { HttpListener::Https(ref mut tcp, ref ssl_context) => {
let stream = try!(tcp.accept()); let stream = CloneTcpStream(try!(tcp.accept()).0);
match SslStream::<TcpStream>::new_server(&**ssl_context, stream) { match SslStream::new_server(&**ssl_context, stream) {
Ok(ssl_stream) => HttpStream::Https(ssl_stream), Ok(ssl_stream) => HttpStream::Https(ssl_stream),
Err(StreamError(ref e)) => { Err(StreamError(ref e)) => {
return Err(IoError { return Err(io::Error::new(io::ErrorKind::ConnectionAborted,
kind: ConnectionAborted, "SSL Handshake Interrupted",
desc: "SSL Handshake Interrupted", Some(e.to_string())));
detail: Some(e.desc.to_string())
});
}, },
Err(e) => return Err(lift_ssl_error(e)) Err(e) => return Err(lift_ssl_error(e))
} }
@@ -256,19 +215,39 @@ impl NetworkAcceptor for HttpAcceptor {
} }
#[inline] #[inline]
fn close(&mut self) -> IoResult<()> { fn socket_addr(&mut self) -> io::Result<SocketAddr> {
match *self { match *self {
HttpAcceptor::Http(ref mut tcp, _) => tcp.close_accept(), HttpListener::Http(ref mut tcp) => tcp.socket_addr(),
HttpAcceptor::Https(ref mut tcp, _, _) => tcp.close_accept(), HttpListener::Https(ref mut tcp, _) => tcp.socket_addr(),
} }
} }
}
#[doc(hidden)]
pub struct CloneTcpStream(TcpStream);
impl Clone for CloneTcpStream{
#[inline] #[inline]
fn socket_name(&self) -> IoResult<SocketAddr> { fn clone(&self) -> CloneTcpStream {
match *self { CloneTcpStream(self.0.try_clone().unwrap())
HttpAcceptor::Http(_, addr) => Ok(addr), }
HttpAcceptor::Https(_, addr, _) => Ok(addr), }
}
impl Read for CloneTcpStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
}
impl Write for CloneTcpStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
} }
} }
@@ -276,14 +255,14 @@ impl NetworkAcceptor for HttpAcceptor {
#[derive(Clone)] #[derive(Clone)]
pub enum HttpStream { pub enum HttpStream {
/// A stream over the HTTP protocol. /// A stream over the HTTP protocol.
Http(TcpStream), Http(CloneTcpStream),
/// A stream over the HTTP protocol, protected by SSL. /// A stream over the HTTP protocol, protected by SSL.
Https(SslStream<TcpStream>), Https(SslStream<CloneTcpStream>),
} }
impl Reader for HttpStream { impl Read for HttpStream {
#[inline] #[inline]
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self { match *self {
HttpStream::Http(ref mut inner) => inner.read(buf), HttpStream::Http(ref mut inner) => inner.read(buf),
HttpStream::Https(ref mut inner) => inner.read(buf) HttpStream::Https(ref mut inner) => inner.read(buf)
@@ -291,16 +270,16 @@ impl Reader for HttpStream {
} }
} }
impl Writer for HttpStream { impl Write for HttpStream {
#[inline] #[inline]
fn write_all(&mut self, msg: &[u8]) -> IoResult<()> { fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
match *self { match *self {
HttpStream::Http(ref mut inner) => inner.write_all(msg), HttpStream::Http(ref mut inner) => inner.write(msg),
HttpStream::Https(ref mut inner) => inner.write_all(msg) HttpStream::Https(ref mut inner) => inner.write(msg)
} }
} }
#[inline] #[inline]
fn flush(&mut self) -> IoResult<()> { fn flush(&mut self) -> io::Result<()> {
match *self { match *self {
HttpStream::Http(ref mut inner) => inner.flush(), HttpStream::Http(ref mut inner) => inner.flush(),
HttpStream::Https(ref mut inner) => inner.flush(), HttpStream::Https(ref mut inner) => inner.flush(),
@@ -309,10 +288,10 @@ impl Writer for HttpStream {
} }
impl NetworkStream for HttpStream { impl NetworkStream for HttpStream {
fn peer_name(&mut self) -> IoResult<SocketAddr> { fn peer_addr(&mut self) -> io::Result<SocketAddr> {
match *self { match *self {
HttpStream::Http(ref mut inner) => inner.peer_name(), HttpStream::Http(ref mut inner) => inner.0.peer_addr(),
HttpStream::Https(ref mut inner) => inner.get_mut().peer_name() HttpStream::Https(ref mut inner) => inner.get_mut().0.peer_addr()
} }
} }
} }
@@ -327,16 +306,16 @@ pub type ContextVerifier<'v> = Box<FnMut(&mut SslContext) -> ()+'v>;
impl<'v> NetworkConnector for HttpConnector<'v> { impl<'v> NetworkConnector for HttpConnector<'v> {
type Stream = HttpStream; type Stream = HttpStream;
fn connect(&mut self, host: &str, port: Port, scheme: &str) -> IoResult<HttpStream> { fn connect(&mut self, host: &str, port: u16, scheme: &str) -> io::Result<HttpStream> {
let addr = (host, port); let addr = &(host, port);
match scheme { match scheme {
"http" => { "http" => {
debug!("http scheme"); debug!("http scheme");
Ok(HttpStream::Http(try!(TcpStream::connect(addr)))) Ok(HttpStream::Http(CloneTcpStream(try!(TcpStream::connect(addr)))))
}, },
"https" => { "https" => {
debug!("https scheme"); debug!("https scheme");
let stream = try!(TcpStream::connect(addr)); let stream = CloneTcpStream(try!(TcpStream::connect(addr)));
let mut context = try!(SslContext::new(Sslv23).map_err(lift_ssl_error)); let mut context = try!(SslContext::new(Sslv23).map_err(lift_ssl_error));
if let Some(ref mut verifier) = self.0 { if let Some(ref mut verifier) = self.0 {
verifier(&mut context); verifier(&mut context);
@@ -347,32 +326,26 @@ impl<'v> NetworkConnector for HttpConnector<'v> {
Ok(HttpStream::Https(stream)) Ok(HttpStream::Https(stream))
}, },
_ => { _ => {
Err(IoError { Err(io::Error::new(io::ErrorKind::InvalidInput,
kind: InvalidInput, "Invalid scheme for Http",
desc: "Invalid scheme for Http", None))
detail: None
})
} }
} }
} }
} }
fn lift_ssl_error(ssl: SslError) -> IoError { fn lift_ssl_error(ssl: SslError) -> io::Error {
debug!("lift_ssl_error: {:?}", ssl); debug!("lift_ssl_error: {:?}", ssl);
match ssl { match ssl {
StreamError(err) => err, StreamError(err) => err,
SslSessionClosed => IoError { SslSessionClosed => io::Error::new(io::ErrorKind::ConnectionAborted,
kind: ConnectionAborted, "SSL Connection Closed",
desc: "SSL Connection Closed", None),
detail: None
},
// Unfortunately throw this away. No way to support this // Unfortunately throw this away. No way to support this
// detail without a better Error abstraction. // detail without a better Error abstraction.
OpenSslErrors(errs) => IoError { OpenSslErrors(errs) => io::Error::new(io::ErrorKind::Other,
kind: OtherIoError, "Error in OpenSSL",
desc: "Error in OpenSSL", Some(format!("{:?}", errs)))
detail: Some(format!("{:?}", errs))
}
} }
} }

View File

@@ -1,16 +1,16 @@
use std::thread::{self, JoinGuard}; use std::thread::{self, JoinGuard};
use std::sync::mpsc; use std::sync::mpsc;
use std::collections::VecMap; use std::collections::VecMap;
use net::NetworkAcceptor; use net::NetworkListener;
pub struct AcceptorPool<A: NetworkAcceptor> { pub struct ListenerPool<A: NetworkListener> {
acceptor: A acceptor: A
} }
impl<'a, A: NetworkAcceptor + 'a> AcceptorPool<A> { impl<'a, A: NetworkListener + Send + 'a> ListenerPool<A> {
/// Create a thread pool to manage the acceptor. /// Create a thread pool to manage the acceptor.
pub fn new(acceptor: A) -> AcceptorPool<A> { pub fn new(acceptor: A) -> ListenerPool<A> {
AcceptorPool { acceptor: acceptor } ListenerPool { acceptor: acceptor }
} }
/// Runs the acceptor pool. Blocks until the acceptors are closed. /// Runs the acceptor pool. Blocks until the acceptors are closed.
@@ -44,23 +44,16 @@ impl<'a, A: NetworkAcceptor + 'a> AcceptorPool<A> {
} }
} }
fn spawn_with<'a, A, F>(supervisor: mpsc::Sender<usize>, work: &'a F, mut acceptor: A, id: usize) -> JoinGuard<'a, ()> fn spawn_with<'a, A, F>(supervisor: mpsc::Sender<usize>, work: &'a F, mut acceptor: A, id: usize) -> thread::JoinGuard<'a, ()>
where A: NetworkAcceptor + 'a, where A: NetworkListener + Send + 'a,
F: Fn(<A as NetworkAcceptor>::Stream) + Send + Sync + 'a { F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'a {
use std::old_io::EndOfFile;
thread::scoped(move || { thread::scoped(move || {
let sentinel = Sentinel::new(supervisor, id); let _sentinel = Sentinel::new(supervisor, id);
loop { loop {
match acceptor.accept() { match acceptor.accept() {
Ok(stream) => work(stream), Ok(stream) => work(stream),
Err(ref e) if e.kind == EndOfFile => {
debug!("Server closed.");
sentinel.cancel();
return;
},
Err(e) => { Err(e) => {
error!("Connection failed: {}", e); error!("Connection failed: {}", e);
} }
@@ -72,7 +65,7 @@ where A: NetworkAcceptor + 'a,
struct Sentinel<T: Send> { struct Sentinel<T: Send> {
value: Option<T>, value: Option<T>,
supervisor: mpsc::Sender<T>, supervisor: mpsc::Sender<T>,
active: bool //active: bool
} }
impl<T: Send> Sentinel<T> { impl<T: Send> Sentinel<T> {
@@ -80,18 +73,18 @@ impl<T: Send> Sentinel<T> {
Sentinel { Sentinel {
value: Some(data), value: Some(data),
supervisor: channel, supervisor: channel,
active: true //active: true
} }
} }
fn cancel(mut self) { self.active = false; } //fn cancel(mut self) { self.active = false; }
} }
#[unsafe_destructor] #[unsafe_destructor]
impl<T: Send + 'static> Drop for Sentinel<T> { impl<T: Send + 'static> Drop for Sentinel<T> {
fn drop(&mut self) { fn drop(&mut self) {
// If we were cancelled, get out of here. // If we were cancelled, get out of here.
if !self.active { return; } //if !self.active { return; }
// Respawn ourselves // Respawn ourselves
let _ = self.supervisor.send(self.value.take().unwrap()); let _ = self.supervisor.send(self.value.take().unwrap());

View File

@@ -1,7 +1,9 @@
//! HTTP Server //! HTTP Server
use std::old_io::{Listener, BufferedReader, BufferedWriter}; use std::io::{BufReader, BufWriter};
use std::old_io::net::ip::{IpAddr, Port, SocketAddr}; use std::marker::PhantomData;
use std::net::{IpAddr, SocketAddr};
use std::os; use std::os;
use std::path::Path;
use std::thread::{self, JoinGuard}; use std::thread::{self, JoinGuard};
pub use self::request::Request; pub use self::request::Request;
@@ -13,25 +15,24 @@ use HttpError::HttpIoError;
use {HttpResult}; use {HttpResult};
use header::Connection; use header::Connection;
use header::ConnectionOption::{Close, KeepAlive}; use header::ConnectionOption::{Close, KeepAlive};
use net::{NetworkListener, NetworkStream, NetworkAcceptor, use net::{NetworkListener, NetworkStream, HttpListener};
HttpAcceptor, HttpListener};
use version::HttpVersion::{Http10, Http11}; use version::HttpVersion::{Http10, Http11};
use self::acceptor::AcceptorPool; use self::listener::ListenerPool;
pub mod request; pub mod request;
pub mod response; pub mod response;
mod acceptor; mod listener;
/// A server can listen on a TCP socket. /// A server can listen on a TCP socket.
/// ///
/// Once listening, it will create a `Request`/`Response` pair for each /// Once listening, it will create a `Request`/`Response` pair for each
/// incoming connection, and hand them to the provided handler. /// incoming connection, and hand them to the provided handler.
pub struct Server<L = HttpListener> { pub struct Server<'a, H: Handler, L = HttpListener> {
ip: IpAddr, handler: H,
port: Port, ssl: Option<(&'a Path, &'a Path)>,
listener: L, _marker: PhantomData<L>
} }
macro_rules! try_option( macro_rules! try_option(
@@ -43,38 +44,59 @@ macro_rules! try_option(
}} }}
); );
impl Server<HttpListener> { impl<'a, H: Handler, L: NetworkListener> Server<'a, H, L> {
/// Creates a new server that will handle `HttpStream`s. pub fn new(handler: H) -> Server<'a, H, L> {
pub fn http(ip: IpAddr, port: Port) -> Server { Server {
Server::with_listener(ip, port, HttpListener::Http) handler: handler,
} ssl: None,
/// Creates a new server that will handler `HttpStreams`s using a TLS connection. _marker: PhantomData
pub fn https(ip: IpAddr, port: Port, cert: Path, key: Path) -> Server { }
Server::with_listener(ip, port, HttpListener::Https(cert, key))
} }
} }
impl< impl<'a, H: Handler + 'static> Server<'a, H, HttpListener> {
L: NetworkListener<Acceptor=A> + Send,
A: NetworkAcceptor<Stream=S> + Send + 'static,
S: NetworkStream + Clone + Send> Server<L> {
/// Creates a new server that will handle `HttpStream`s. /// Creates a new server that will handle `HttpStream`s.
pub fn with_listener(ip: IpAddr, port: Port, listener: L) -> Server<L> { pub fn http(handler: H) -> Server<'a, H, HttpListener> {
Server::new(handler)
}
/// Creates a new server that will handler `HttpStreams`s using a TLS connection.
pub fn https(handler: H, cert: &'a Path, key: &'a Path) -> Server<'a, H, HttpListener> {
Server { Server {
ip: ip, handler: handler,
port: port, ssl: Some((cert, key)),
listener: listener, _marker: PhantomData
} }
} }
}
impl<'a, H: Handler + 'static> Server<'a, H, HttpListener> {
/// Binds to a socket, and starts handling connections using a task pool. /// Binds to a socket, and starts handling connections using a task pool.
pub fn listen_threads<H: Handler + 'static>(mut self, handler: H, threads: usize) -> HttpResult<Listening<L::Acceptor>> { pub fn listen_threads(self, ip: IpAddr, port: u16, threads: usize) -> HttpResult<Listening> {
debug!("binding to {:?}:{:?}", self.ip, self.port); let addr = &(ip, port);
let acceptor = try!(self.listener.listen((self.ip, self.port))); let listener = try!(match self.ssl {
let socket = try!(acceptor.socket_name()); Some((cert, key)) => HttpListener::https(addr, cert, key),
None => HttpListener::http(addr)
});
self.with_listener(listener, threads)
}
/// Binds to a socket and starts handling connections.
pub fn listen(self, ip: IpAddr, port: u16) -> HttpResult<Listening> {
self.listen_threads(ip, port, os::num_cpus() * 5 / 4)
}
}
impl<
'a,
H: Handler + 'static,
L: NetworkListener<Stream=S> + Send + 'static,
S: NetworkStream + Clone + Send> Server<'a, H, L> {
/// Creates a new server that will handle `HttpStream`s.
pub fn with_listener(self, mut listener: L, threads: usize) -> HttpResult<Listening> {
let socket = try!(listener.socket_addr());
let handler = self.handler;
debug!("threads = {:?}", threads); debug!("threads = {:?}", threads);
let pool = AcceptorPool::new(acceptor.clone()); let pool = ListenerPool::new(listener.clone());
let work = move |stream| handle_connection(stream, &handler); let work = move |stream| handle_connection(stream, &handler);
let guard = thread::scoped(move || pool.accept(work, threads)); let guard = thread::scoped(move || pool.accept(work, threads));
@@ -82,21 +104,15 @@ S: NetworkStream + Clone + Send> Server<L> {
Ok(Listening { Ok(Listening {
_guard: guard, _guard: guard,
socket: socket, socket: socket,
acceptor: acceptor
}) })
} }
/// Binds to a socket and starts handling connections.
pub fn listen<H: Handler + 'static>(self, handler: H) -> HttpResult<Listening<L::Acceptor>> {
self.listen_threads(handler, os::num_cpus() * 5 / 4)
}
} }
fn handle_connection<S, H>(mut stream: S, handler: &H) fn handle_connection<S, H>(mut stream: S, handler: &H)
where S: NetworkStream + Clone, H: Handler { where S: NetworkStream + Clone, H: Handler {
debug!("Incoming stream"); debug!("Incoming stream");
let addr = match stream.peer_name() { let addr = match stream.peer_addr() {
Ok(addr) => addr, Ok(addr) => addr,
Err(e) => { Err(e) => {
error!("Peer Name error: {:?}", e); error!("Peer Name error: {:?}", e);
@@ -104,8 +120,8 @@ where S: NetworkStream + Clone, H: Handler {
} }
}; };
let mut rdr = BufferedReader::new(stream.clone()); let mut rdr = BufReader::new(stream.clone());
let mut wrt = BufferedWriter::new(stream); let mut wrt = BufWriter::new(stream);
let mut keep_alive = true; let mut keep_alive = true;
while keep_alive { while keep_alive {
@@ -135,18 +151,17 @@ where S: NetworkStream + Clone, H: Handler {
} }
/// A listening server, which can later be closed. /// A listening server, which can later be closed.
pub struct Listening<A = HttpAcceptor> { pub struct Listening {
acceptor: A,
_guard: JoinGuard<'static, ()>, _guard: JoinGuard<'static, ()>,
/// The socket addresses that the server is bound to. /// The socket addresses that the server is bound to.
pub socket: SocketAddr, pub socket: SocketAddr,
} }
impl<A: NetworkAcceptor> Listening<A> { impl Listening {
/// Stop the server from listening to its socket address. /// Stop the server from listening to its socket address.
pub fn close(&mut self) -> HttpResult<()> { pub fn close(&mut self) -> HttpResult<()> {
debug!("closing server"); debug!("closing server");
try!(self.acceptor.close()); //try!(self.acceptor.close());
Ok(()) Ok(())
} }
} }

View File

@@ -2,8 +2,8 @@
//! //!
//! These are requests that a `hyper::Server` receives, and include its method, //! These are requests that a `hyper::Server` receives, and include its method,
//! target URI, headers, and message body. //! target URI, headers, and message body.
use std::old_io::IoResult; use std::io::{self, Read};
use std::old_io::net::ip::SocketAddr; use std::net::SocketAddr;
use {HttpResult}; use {HttpResult};
use version::{HttpVersion}; use version::{HttpVersion};
@@ -26,14 +26,14 @@ pub struct Request<'a> {
pub uri: RequestUri, pub uri: RequestUri,
/// The version of HTTP for this request. /// The version of HTTP for this request.
pub version: HttpVersion, pub version: HttpVersion,
body: HttpReader<&'a mut (Reader + 'a)> body: HttpReader<&'a mut (Read + 'a)>
} }
impl<'a> Request<'a> { impl<'a> Request<'a> {
/// Create a new Request, reading the StartLine and Headers so they are /// Create a new Request, reading the StartLine and Headers so they are
/// immediately useful. /// immediately useful.
pub fn new(mut stream: &'a mut (Reader + 'a), addr: SocketAddr) -> HttpResult<Request<'a>> { pub fn new(mut stream: &'a mut (Read + 'a), addr: SocketAddr) -> HttpResult<Request<'a>> {
let (method, uri, version) = try!(read_request_line(&mut stream)); let (method, uri, version) = try!(read_request_line(&mut stream));
debug!("Request Line: {:?} {:?} {:?}", method, uri, version); debug!("Request Line: {:?} {:?} {:?}", method, uri, version);
let headers = try!(Headers::from_raw(&mut stream)); let headers = try!(Headers::from_raw(&mut stream));
@@ -66,14 +66,14 @@ impl<'a> Request<'a> {
/// Deconstruct a Request into its constituent parts. /// Deconstruct a Request into its constituent parts.
pub fn deconstruct(self) -> (SocketAddr, Method, Headers, pub fn deconstruct(self) -> (SocketAddr, Method, Headers,
RequestUri, HttpVersion, RequestUri, HttpVersion,
HttpReader<&'a mut (Reader + 'a)>,) { HttpReader<&'a mut (Read + 'a)>,) {
(self.remote_addr, self.method, self.headers, (self.remote_addr, self.method, self.headers,
self.uri, self.version, self.body) self.uri, self.version, self.body)
} }
} }
impl<'a> Reader for Request<'a> { impl<'a> Read for Request<'a> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.body.read(buf) self.body.read(buf)
} }
} }
@@ -84,12 +84,19 @@ mod tests {
use mock::MockStream; use mock::MockStream;
use super::Request; use super::Request;
use std::old_io::net::ip::SocketAddr; use std::io::{self, Read};
use std::net::SocketAddr;
fn sock(s: &str) -> SocketAddr { fn sock(s: &str) -> SocketAddr {
s.parse().unwrap() s.parse().unwrap()
} }
fn read_to_string(mut req: Request) -> io::Result<String> {
let mut s = String::new();
try!(req.read_to_string(&mut s));
Ok(s)
}
#[test] #[test]
fn test_get_empty_body() { fn test_get_empty_body() {
let mut stream = MockStream::with_input(b"\ let mut stream = MockStream::with_input(b"\
@@ -99,8 +106,8 @@ mod tests {
I'm a bad request.\r\n\ I'm a bad request.\r\n\
"); ");
let mut req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap(); let req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap();
assert_eq!(req.read_to_string(), Ok("".to_string())); assert_eq!(read_to_string(req), Ok("".to_string()));
} }
#[test] #[test]
@@ -112,8 +119,8 @@ mod tests {
I'm a bad request.\r\n\ I'm a bad request.\r\n\
"); ");
let mut req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap(); let req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap();
assert_eq!(req.read_to_string(), Ok("".to_string())); assert_eq!(read_to_string(req), Ok("".to_string()));
} }
#[test] #[test]
@@ -125,8 +132,8 @@ mod tests {
I'm a bad request.\r\n\ I'm a bad request.\r\n\
"); ");
let mut req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap(); let req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap();
assert_eq!(req.read_to_string(), Ok("".to_string())); assert_eq!(read_to_string(req), Ok("".to_string()));
} }
#[test] #[test]
@@ -146,7 +153,7 @@ mod tests {
\r\n" \r\n"
); );
let mut req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap(); let req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap();
// The headers are correct? // The headers are correct?
match req.headers.get::<Host>() { match req.headers.get::<Host>() {
@@ -163,8 +170,7 @@ mod tests {
None => panic!("Transfer-Encoding: chunked expected!"), None => panic!("Transfer-Encoding: chunked expected!"),
}; };
// The content is correctly read? // The content is correctly read?
let body = req.read_to_string().unwrap(); assert_eq!(read_to_string(req), Ok("qwert".to_string()));
assert_eq!("qwert", body);
} }
/// Tests that when a chunk size is not a valid radix-16 number, an error /// Tests that when a chunk size is not a valid radix-16 number, an error
@@ -182,9 +188,9 @@ mod tests {
\r\n" \r\n"
); );
let mut req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap(); let req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap();
assert!(req.read_to_string().is_err()); assert!(read_to_string(req).is_err());
} }
/// Tests that when a chunk size contains an invalid extension, an error is /// Tests that when a chunk size contains an invalid extension, an error is
@@ -202,9 +208,9 @@ mod tests {
\r\n" \r\n"
); );
let mut req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap(); let req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap();
assert!(req.read_to_string().is_err()); assert!(read_to_string(req).is_err());
} }
/// Tests that when a valid extension that contains a digit is appended to /// Tests that when a valid extension that contains a digit is appended to
@@ -222,9 +228,9 @@ mod tests {
\r\n" \r\n"
); );
let mut req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap(); let req = Request::new(&mut stream, sock("127.0.0.1:80")).unwrap();
assert_eq!("1", req.read_to_string().unwrap()) assert_eq!(read_to_string(req), Ok("1".to_string()));
} }
} }

View File

@@ -2,8 +2,8 @@
//! //!
//! These are responses sent by a `hyper::Server` to clients, after //! These are responses sent by a `hyper::Server` to clients, after
//! receiving a request. //! receiving a request.
use std::old_io::IoResult;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::io::{self, Write};
use time::now_utc; use time::now_utc;
@@ -19,7 +19,7 @@ pub struct Response<'a, W = Fresh> {
/// The HTTP version of this response. /// The HTTP version of this response.
pub version: version::HttpVersion, pub version: version::HttpVersion,
// Stream the Response is writing to, not accessible through UnwrittenResponse // Stream the Response is writing to, not accessible through UnwrittenResponse
body: HttpWriter<&'a mut (Writer + 'a)>, body: HttpWriter<&'a mut (Write + 'a)>,
// The status code for the request. // The status code for the request.
status: status::StatusCode, status: status::StatusCode,
// The outgoing headers on this response. // The outgoing headers on this response.
@@ -38,7 +38,7 @@ impl<'a, W> Response<'a, W> {
/// Construct a Response from its constituent parts. /// Construct a Response from its constituent parts.
pub fn construct(version: version::HttpVersion, pub fn construct(version: version::HttpVersion,
body: HttpWriter<&'a mut (Writer + 'a)>, body: HttpWriter<&'a mut (Write + 'a)>,
status: status::StatusCode, status: status::StatusCode,
headers: header::Headers) -> Response<'a, Fresh> { headers: header::Headers) -> Response<'a, Fresh> {
Response { Response {
@@ -51,7 +51,7 @@ impl<'a, W> Response<'a, W> {
} }
/// Deconstruct this Response into its constituent parts. /// Deconstruct this Response into its constituent parts.
pub fn deconstruct(self) -> (version::HttpVersion, HttpWriter<&'a mut (Writer + 'a)>, pub fn deconstruct(self) -> (version::HttpVersion, HttpWriter<&'a mut (Write + 'a)>,
status::StatusCode, header::Headers) { status::StatusCode, header::Headers) {
(self.version, self.body, self.status, self.headers) (self.version, self.body, self.status, self.headers)
} }
@@ -59,7 +59,7 @@ impl<'a, W> Response<'a, W> {
impl<'a> Response<'a, Fresh> { impl<'a> Response<'a, Fresh> {
/// Creates a new Response that can be used to write to a network stream. /// Creates a new Response that can be used to write to a network stream.
pub fn new(stream: &'a mut (Writer + 'a)) -> Response<'a, Fresh> { pub fn new(stream: &'a mut (Write + 'a)) -> Response<'a, Fresh> {
Response { Response {
status: status::StatusCode::Ok, status: status::StatusCode::Ok,
version: version::HttpVersion::Http11, version: version::HttpVersion::Http11,
@@ -70,7 +70,7 @@ impl<'a> Response<'a, Fresh> {
} }
/// Consume this Response<Fresh>, writing the Headers and Status and creating a Response<Streaming> /// Consume this Response<Fresh>, writing the Headers and Status and creating a Response<Streaming>
pub fn start(mut self) -> IoResult<Response<'a, Streaming>> { pub fn start(mut self) -> io::Result<Response<'a, Streaming>> {
debug!("writing head: {:?} {:?}", self.version, self.status); debug!("writing head: {:?} {:?}", self.version, self.status);
try!(write!(&mut self.body, "{} {}{}{}", self.version, self.status, CR as char, LF as char)); try!(write!(&mut self.body, "{} {}{}{}", self.version, self.status, CR as char, LF as char));
@@ -110,13 +110,12 @@ impl<'a> Response<'a, Fresh> {
debug!("headers [\n{:?}]", self.headers); debug!("headers [\n{:?}]", self.headers);
try!(write!(&mut self.body, "{}", self.headers)); try!(write!(&mut self.body, "{}", self.headers));
try!(write!(&mut self.body, "{}", LINE_ENDING));
try!(self.body.write_str(LINE_ENDING));
let stream = if chunked { let stream = if chunked {
ChunkedWriter(self.body.unwrap()) ChunkedWriter(self.body.into_inner())
} else { } else {
SizedWriter(self.body.unwrap(), len) SizedWriter(self.body.into_inner(), len)
}; };
// "copy" to change the phantom type // "copy" to change the phantom type
@@ -139,20 +138,20 @@ impl<'a> Response<'a, Fresh> {
impl<'a> Response<'a, Streaming> { impl<'a> Response<'a, Streaming> {
/// Flushes all writing of a response to the client. /// Flushes all writing of a response to the client.
pub fn end(self) -> IoResult<()> { pub fn end(self) -> io::Result<()> {
debug!("ending"); debug!("ending");
try!(self.body.end()); try!(self.body.end());
Ok(()) Ok(())
} }
} }
impl<'a> Writer for Response<'a, Streaming> { impl<'a> Write for Response<'a, Streaming> {
fn write_all(&mut self, msg: &[u8]) -> IoResult<()> { fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
debug!("write {:?} bytes", msg.len()); debug!("write {:?} bytes", msg.len());
self.body.write_all(msg) self.body.write(msg)
} }
fn flush(&mut self) -> IoResult<()> { fn flush(&mut self) -> io::Result<()> {
self.body.flush() self.body.flush()
} }
} }