Merge pull request #448 from mlalic/http2-initial

feat(client): initial support for HTTP/2
This commit is contained in:
Sean McArthur
2015-06-02 10:11:45 -07:00
11 changed files with 1507 additions and 188 deletions

View File

@@ -24,6 +24,7 @@ unicase = "0.1"
url = "0.2"
traitobject = "0.0.1"
typeable = "0.1"
solicit = "0.2"
[dev-dependencies]
env_logger = "*"

34
examples/client_http2.rs Normal file
View File

@@ -0,0 +1,34 @@
#![deny(warnings)]
extern crate hyper;
extern crate env_logger;
use std::env;
use std::io;
use hyper::Client;
use hyper::header::Connection;
use hyper::http2;
fn main() {
env_logger::init().unwrap();
let url = match env::args().nth(1) {
Some(url) => url,
None => {
println!("Usage: client <url>");
return;
}
};
let mut client = Client::with_protocol(http2::new_protocol());
// `Connection: Close` is not a valid header for HTTP/2, but the client handles it gracefully.
let mut res = client.get(&*url)
.header(Connection::close())
.send().unwrap();
println!("Response: {}", res.status);
println!("Headers:\n{}", res.headers);
io::copy(&mut res, &mut io::stdout()).unwrap();
}

View File

@@ -54,11 +54,14 @@ pub mod pool;
pub mod request;
pub mod response;
use message::Protocol;
use http11::Http11Protocol;
/// A Client to use additional features with Requests.
///
/// Clients can handle things such as: redirect policy, connection pooling.
pub struct Client {
connector: Connector,
protocol: Box<Protocol + Send>,
redirect_policy: RedirectPolicy,
}
@@ -77,15 +80,20 @@ impl Client {
/// Create a new client with a specific connector.
pub fn with_connector<C, S>(connector: C) -> Client
where C: NetworkConnector<Stream=S> + Send + 'static, S: NetworkStream + Send {
Client::with_protocol(Http11Protocol::with_connector(connector))
}
/// Create a new client with a specific `Protocol`.
pub fn with_protocol<P: Protocol + Send + 'static>(protocol: P) -> Client {
Client {
connector: with_connector(connector),
protocol: Box::new(protocol),
redirect_policy: Default::default()
}
}
/// Set the SSL verifier callback for use with OpenSSL.
pub fn set_ssl_verifier(&mut self, verifier: ContextVerifier) {
self.connector.set_ssl_verifier(verifier);
self.protocol.set_ssl_verifier(verifier);
}
/// Set the RedirectPolicy.
@@ -131,44 +139,10 @@ impl Client {
}
}
fn with_connector<C: NetworkConnector<Stream=S> + Send + 'static, S: NetworkStream + Send>(c: C) -> Connector {
Connector(Box::new(ConnAdapter(c)))
}
impl Default for Client {
fn default() -> Client { Client::new() }
}
struct ConnAdapter<C: NetworkConnector + Send>(C);
impl<C: NetworkConnector<Stream=S> + Send, S: NetworkStream + Send> NetworkConnector for ConnAdapter<C> {
type Stream = Box<NetworkStream + Send>;
#[inline]
fn connect(&self, host: &str, port: u16, scheme: &str)
-> ::Result<Box<NetworkStream + Send>> {
Ok(try!(self.0.connect(host, port, scheme)).into())
}
#[inline]
fn set_ssl_verifier(&mut self, verifier: ContextVerifier) {
self.0.set_ssl_verifier(verifier);
}
}
struct Connector(Box<NetworkConnector<Stream=Box<NetworkStream + Send>> + Send>);
impl NetworkConnector for Connector {
type Stream = Box<NetworkStream + Send>;
#[inline]
fn connect(&self, host: &str, port: u16, scheme: &str)
-> ::Result<Box<NetworkStream + Send>> {
Ok(try!(self.0.connect(host, port, scheme)).into())
}
#[inline]
fn set_ssl_verifier(&mut self, verifier: ContextVerifier) {
self.0.set_ssl_verifier(verifier);
}
}
/// Options for an individual Request.
///
/// One of these will be built for you if you use one of the convenience
@@ -229,7 +203,11 @@ impl<'a, U: IntoUrl> RequestBuilder<'a, U> {
};
loop {
let mut req = try!(Request::with_connector(method.clone(), url.clone(), &client.connector));
let message = {
let (host, port) = try!(get_host_and_port(&url));
try!(client.protocol.new_message(&host, port, &*url.scheme))
};
let mut req = try!(Request::with_message(method.clone(), url.clone(), message));
headers.as_ref().map(|headers| req.headers_mut().extend(headers.iter()));
match (can_have_body, body.as_ref()) {

View File

@@ -1,18 +1,19 @@
//! Client Requests
use std::marker::PhantomData;
use std::io::{self, Write, BufWriter};
use std::io::{self, Write};
use url::Url;
use method::{self, Method};
use header::Headers;
use header::{self, Host};
use header::Host;
use net::{NetworkStream, NetworkConnector, HttpConnector, Fresh, Streaming};
use http::{HttpWriter, LINE_ENDING};
use http::HttpWriter::{ThroughWriter, ChunkedWriter, SizedWriter, EmptyWriter};
use version;
use client::{Response, get_host_and_port};
use message::{HttpMessage, RequestHead};
use http11::Http11Message;
/// A client request to a remote server.
/// The W type tracks the state of the request, Fresh vs Streaming.
@@ -23,7 +24,7 @@ pub struct Request<W> {
/// The HTTP version of this request.
pub version: version::HttpVersion,
body: HttpWriter<BufWriter<Box<NetworkStream + Send>>>,
message: Box<HttpMessage>,
headers: Headers,
method: method::Method,
@@ -41,6 +42,28 @@ impl<W> Request<W> {
}
impl Request<Fresh> {
/// Create a new `Request<Fresh>` that will use the given `HttpMessage` for its communication
/// with the server. This implies that the given `HttpMessage` instance has already been
/// properly initialized by the caller (e.g. a TCP connection's already established).
pub fn with_message(method: method::Method, url: Url, message: Box<HttpMessage>)
-> ::Result<Request<Fresh>> {
let (host, port) = try!(get_host_and_port(&url));
let mut headers = Headers::new();
headers.set(Host {
hostname: host,
port: Some(port),
});
Ok(Request {
method: method,
headers: headers,
url: url,
version: version::HttpVersion::Http11,
message: message,
_marker: PhantomData,
})
}
/// Create a new client request.
pub fn new(method: method::Method, url: Url) -> ::Result<Request<Fresh>> {
let mut conn = HttpConnector(None);
@@ -53,92 +76,26 @@ impl Request<Fresh> {
C: NetworkConnector<Stream=S>,
S: Into<Box<NetworkStream + Send>> {
let (host, port) = try!(get_host_and_port(&url));
let stream = try!(connector.connect(&*host, port, &*url.scheme)).into();
let stream = ThroughWriter(BufWriter::new(stream));
let mut headers = Headers::new();
headers.set(Host {
hostname: host,
port: Some(port),
});
Ok(Request {
method: method,
headers: headers,
url: url,
version: version::HttpVersion::Http11,
body: stream,
_marker: PhantomData,
})
Request::with_message(method, url, Box::new(Http11Message::with_stream(stream)))
}
/// Consume a Fresh Request, writing the headers and method,
/// returning a Streaming Request.
pub fn start(mut self) -> ::Result<Request<Streaming>> {
let mut uri = self.url.serialize_path().unwrap();
if let Some(ref q) = self.url.query {
uri.push('?');
uri.push_str(&q[..]);
}
debug!("request line: {:?} {:?} {:?}", self.method, uri, self.version);
try!(write!(&mut self.body, "{} {} {}{}",
self.method, uri, self.version, LINE_ENDING));
let stream = match self.method {
Method::Get | Method::Head => {
debug!("headers={:?}", self.headers);
try!(write!(&mut self.body, "{}{}", self.headers, LINE_ENDING));
EmptyWriter(self.body.into_inner())
},
_ => {
let mut chunked = true;
let mut len = 0;
match self.headers.get::<header::ContentLength>() {
Some(cl) => {
chunked = false;
len = **cl;
},
None => ()
};
// can't do in match above, thanks borrowck
if chunked {
let encodings = match self.headers.get_mut::<header::TransferEncoding>() {
Some(&mut header::TransferEncoding(ref mut encodings)) => {
//TODO: check if chunked is already in encodings. use HashSet?
encodings.push(header::Encoding::Chunked);
false
},
None => true
};
if encodings {
self.headers.set::<header::TransferEncoding>(
header::TransferEncoding(vec![header::Encoding::Chunked]))
}
}
debug!("headers={:?}", self.headers);
try!(write!(&mut self.body, "{}{}", self.headers, LINE_ENDING));
if chunked {
ChunkedWriter(self.body.into_inner())
} else {
SizedWriter(self.body.into_inner(), len)
}
}
};
let head = try!(self.message.set_outgoing(RequestHead {
headers: self.headers,
method: self.method,
url: self.url,
}));
Ok(Request {
method: self.method,
headers: self.headers,
url: self.url,
method: head.method,
headers: head.headers,
url: head.url,
version: self.version,
body: stream,
message: self.message,
_marker: PhantomData,
})
}
@@ -153,20 +110,19 @@ impl Request<Streaming> {
///
/// Consumes the Request.
pub fn send(self) -> ::Result<Response> {
let raw = try!(self.body.end()).into_inner().unwrap(); // end() already flushes
Response::new(raw)
Response::with_message(self.message)
}
}
impl Write for Request<Streaming> {
#[inline]
fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
self.body.write(msg)
self.message.write(msg)
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.body.flush()
self.message.flush()
}
}
@@ -180,11 +136,15 @@ mod tests {
use header::{ContentLength,TransferEncoding,Encoding};
use url::form_urlencoded;
use super::Request;
use http11::Http11Message;
fn run_request(req: Request<Fresh>) -> Vec<u8> {
let req = req.start().unwrap();
let stream = *req.body.end().unwrap()
.into_inner().unwrap().downcast::<MockStream>().ok().unwrap();
let message = req.message;
let mut message = message.downcast::<Http11Message>().ok().unwrap();
message.flush_outgoing().unwrap();
let stream = *message
.into_inner().downcast::<MockStream>().ok().unwrap();
stream.write
}

View File

@@ -1,21 +1,17 @@
//! Client Responses
use std::io::{self, Read};
use std::marker::PhantomData;
use std::net::Shutdown;
use buffer::BufReader;
use header;
use header::{ContentLength, TransferEncoding};
use header::Encoding::Chunked;
use net::{NetworkStream, HttpStream};
use http::{self, HttpReader, RawStatus};
use http::HttpReader::{SizedReader, ChunkedReader, EofReader};
use net::NetworkStream;
use http::{self, RawStatus};
use status;
use version;
use message::{ResponseHead, HttpMessage};
use http11::Http11Message;
/// A response for a client request to a remote server.
#[derive(Debug)]
pub struct Response<S = HttpStream> {
pub struct Response {
/// The status from the server.
pub status: status::StatusCode,
/// The headers from the server.
@@ -23,9 +19,7 @@ pub struct Response<S = HttpStream> {
/// The HTTP version of this response from the server.
pub version: version::HttpVersion,
status_raw: RawStatus,
body: HttpReader<BufReader<Box<NetworkStream + Send>>>,
_marker: PhantomData<S>,
message: Box<HttpMessage>,
}
impl Response {
@@ -33,50 +27,23 @@ impl Response {
/// Creates a new response from a server.
pub fn new(stream: Box<NetworkStream + Send>) -> ::Result<Response> {
trace!("Response::new");
let mut stream = BufReader::new(stream);
let head = try!(http::parse_response(&mut stream));
let raw_status = head.subject;
let headers = head.headers;
Response::with_message(Box::new(Http11Message::with_stream(stream)))
}
/// Creates a new response received from the server on the given `HttpMessage`.
pub fn with_message(mut message: Box<HttpMessage>) -> ::Result<Response> {
trace!("Response::with_message");
let ResponseHead { headers, raw_status, version } = try!(message.get_incoming());
let status = status::StatusCode::from_u16(raw_status.0);
debug!("version={:?}, status={:?}", head.version, status);
debug!("version={:?}, status={:?}", version, status);
debug!("headers={:?}", headers);
let body = if headers.has::<TransferEncoding>() {
match headers.get::<TransferEncoding>() {
Some(&TransferEncoding(ref codings)) => {
if codings.len() > 1 {
trace!("TODO: #2 handle other codings: {:?}", codings);
};
if codings.contains(&Chunked) {
ChunkedReader(stream, None)
} else {
trace!("not chuncked. read till eof");
EofReader(stream)
}
}
None => unreachable!()
}
} else if headers.has::<ContentLength>() {
match headers.get::<ContentLength>() {
Some(&ContentLength(len)) => SizedReader(stream, len),
None => unreachable!()
}
} else {
trace!("neither Transfer-Encoding nor Content-Length");
EofReader(stream)
};
Ok(Response {
status: status,
version: head.version,
version: version,
headers: headers,
body: body,
message: message,
status_raw: raw_status,
_marker: PhantomData,
})
}
@@ -84,21 +51,18 @@ impl Response {
pub fn status_raw(&self) -> &RawStatus {
&self.status_raw
}
/// Consumes the Request to return the NetworkStream underneath.
pub fn into_inner(self) -> Box<NetworkStream + Send> {
self.body.into_inner().into_inner()
}
}
impl Read for Response {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let count = try!(self.body.read(buf));
let count = try!(self.message.read(buf));
if count == 0 {
if !http::should_keep_alive(self.version, &self.headers) {
try!(self.body.get_mut().get_mut().close(Shutdown::Both));
try!(self.message.close_connection()
.map_err(|_| io::Error::new(io::ErrorKind::Other,
"Error closing connection")));
}
}
@@ -110,17 +74,15 @@ impl Read for Response {
mod tests {
use std::borrow::Cow::Borrowed;
use std::io::{self, Read};
use std::marker::PhantomData;
use buffer::BufReader;
use header::Headers;
use header::TransferEncoding;
use header::Encoding;
use http::HttpReader::EofReader;
use http::RawStatus;
use mock::MockStream;
use status;
use version;
use http11::Http11Message;
use super::Response;
@@ -137,12 +99,12 @@ mod tests {
status: status::StatusCode::Ok,
headers: Headers::new(),
version: version::HttpVersion::Http11,
body: EofReader(BufReader::new(Box::new(MockStream::new()))),
message: Box::new(Http11Message::with_stream(Box::new(MockStream::new()))),
status_raw: RawStatus(200, Borrowed("OK")),
_marker: PhantomData,
};
let b = res.into_inner().downcast::<MockStream>().ok().unwrap();
let message = res.message.downcast::<Http11Message>().ok().unwrap();
let b = message.into_inner().downcast::<MockStream>().ok().unwrap();
assert_eq!(b, Box::new(MockStream::new()));
}

View File

@@ -6,6 +6,7 @@ use std::io::Error as IoError;
use httparse;
use openssl::ssl::error::SslError;
use url;
use solicit::http::HttpError as Http2Error;
use self::Error::{
Method,
@@ -15,7 +16,8 @@ use self::Error::{
Status,
Io,
Ssl,
TooLarge
TooLarge,
Http2,
};
@@ -40,7 +42,9 @@ pub enum Error {
/// An `io::Error` that occurred while trying to read or write to a network stream.
Io(IoError),
/// An error from the `openssl` library.
Ssl(SslError)
Ssl(SslError),
/// An HTTP/2-specific error, coming from the `solicit` library.
Http2(Http2Error),
}
impl fmt::Display for Error {
@@ -60,6 +64,7 @@ impl StdError for Error {
Uri(ref e) => e.description(),
Io(ref e) => e.description(),
Ssl(ref e) => e.description(),
Http2(ref e) => e.description(),
}
}
@@ -68,6 +73,7 @@ impl StdError for Error {
Io(ref error) => Some(error),
Ssl(ref error) => Some(error),
Uri(ref error) => Some(error),
Http2(ref error) => Some(error),
_ => None,
}
}
@@ -108,12 +114,19 @@ impl From<httparse::Error> for Error {
}
}
impl From<Http2Error> for Error {
fn from(err: Http2Error) -> Error {
Error::Http2(err)
}
}
#[cfg(test)]
mod tests {
use std::error::Error as StdError;
use std::io;
use httparse;
use openssl::ssl::error::SslError;
use solicit::http::HttpError as Http2Error;
use url;
use super::Error;
use super::Error::*;
@@ -156,6 +169,7 @@ mod tests {
from_and_cause!(io::Error::new(io::ErrorKind::Other, "other") => Io(..));
from_and_cause!(url::ParseError::EmptyHost => Uri(..));
from_and_cause!(SslError::SslSessionClosed => Ssl(..));
from_and_cause!(Http2Error::UnknownStreamId => Http2(..));
from!(SslError::StreamError(io::Error::new(io::ErrorKind::Other, "ssl negotiation")) => Io(..));

305
src/http11.rs Normal file
View File

@@ -0,0 +1,305 @@
//! Adapts the HTTP/1.1 implementation into the `HttpMessage` API.
use std::io::{self, Write, BufWriter, Read};
use std::net::Shutdown;
use method::{Method};
use header::{ContentLength, TransferEncoding};
use header::Encoding::Chunked;
use net::{NetworkConnector, NetworkStream, ContextVerifier};
use http::{HttpWriter, LINE_ENDING};
use http::HttpReader::{SizedReader, ChunkedReader, EofReader};
use http::HttpWriter::{ChunkedWriter, SizedWriter, EmptyWriter};
use buffer::BufReader;
use http::{self, HttpReader};
use message::{
Protocol,
HttpMessage,
RequestHead,
ResponseHead,
};
use header;
use version;
/// An implementation of the `HttpMessage` trait for HTTP/1.1.
#[derive(Debug)]
pub struct Http11Message {
stream: Option<Box<NetworkStream + Send>>,
writer: Option<HttpWriter<BufWriter<Box<NetworkStream + Send>>>>,
reader: Option<HttpReader<BufReader<Box<NetworkStream + Send>>>>,
}
impl Write for Http11Message {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.writer {
None => Err(io::Error::new(io::ErrorKind::Other,
"Not in a writable state")),
Some(ref mut writer) => writer.write(buf),
}
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
match self.writer {
None => Err(io::Error::new(io::ErrorKind::Other,
"Not in a writable state")),
Some(ref mut writer) => writer.flush(),
}
}
}
impl Read for Http11Message {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.reader {
None => Err(io::Error::new(io::ErrorKind::Other,
"Not in a readable state")),
Some(ref mut reader) => reader.read(buf),
}
}
}
impl HttpMessage for Http11Message {
fn set_outgoing(&mut self, mut head: RequestHead) -> ::Result<RequestHead> {
if self.stream.is_none() {
return Err(From::from(io::Error::new(
io::ErrorKind::Other,
"Message not idle, cannot start new outgoing")));
}
let mut stream = BufWriter::new(self.stream.take().unwrap());
let mut uri = head.url.serialize_path().unwrap();
if let Some(ref q) = head.url.query {
uri.push('?');
uri.push_str(&q[..]);
}
let version = version::HttpVersion::Http11;
debug!("request line: {:?} {:?} {:?}", head.method, uri, version);
try!(write!(&mut stream, "{} {} {}{}",
head.method, uri, version, LINE_ENDING));
let stream = match head.method {
Method::Get | Method::Head => {
debug!("headers={:?}", head.headers);
try!(write!(&mut stream, "{}{}", head.headers, LINE_ENDING));
EmptyWriter(stream)
},
_ => {
let mut chunked = true;
let mut len = 0;
match head.headers.get::<header::ContentLength>() {
Some(cl) => {
chunked = false;
len = **cl;
},
None => ()
};
// can't do in match above, thanks borrowck
if chunked {
let encodings = match head.headers.get_mut::<header::TransferEncoding>() {
Some(&mut header::TransferEncoding(ref mut encodings)) => {
//TODO: check if chunked is already in encodings. use HashSet?
encodings.push(header::Encoding::Chunked);
false
},
None => true
};
if encodings {
head.headers.set::<header::TransferEncoding>(
header::TransferEncoding(vec![header::Encoding::Chunked]))
}
}
debug!("headers={:?}", head.headers);
try!(write!(&mut stream, "{}{}", head.headers, LINE_ENDING));
if chunked {
ChunkedWriter(stream)
} else {
SizedWriter(stream, len)
}
}
};
self.writer = Some(stream);
Ok(head)
}
fn get_incoming(&mut self) -> ::Result<ResponseHead> {
try!(self.flush_outgoing());
if self.stream.is_none() {
// The message was already in the reading state...
// TODO Decide what happens in case we try to get a new incoming at that point
return Err(From::from(
io::Error::new(io::ErrorKind::Other,
"Read already in progress")));
}
let stream = self.stream.take().unwrap();
let mut stream = BufReader::new(stream);
let head = try!(http::parse_response(&mut stream));
let raw_status = head.subject;
let headers = head.headers;
let body = if headers.has::<TransferEncoding>() {
match headers.get::<TransferEncoding>() {
Some(&TransferEncoding(ref codings)) => {
if codings.len() > 1 {
trace!("TODO: #2 handle other codings: {:?}", codings);
};
if codings.contains(&Chunked) {
ChunkedReader(stream, None)
} else {
trace!("not chuncked. read till eof");
EofReader(stream)
}
}
None => unreachable!()
}
} else if headers.has::<ContentLength>() {
match headers.get::<ContentLength>() {
Some(&ContentLength(len)) => SizedReader(stream, len),
None => unreachable!()
}
} else {
trace!("neither Transfer-Encoding nor Content-Length");
EofReader(stream)
};
self.reader = Some(body);
Ok(ResponseHead {
headers: headers,
raw_status: raw_status,
version: head.version,
})
}
fn close_connection(&mut self) -> ::Result<()> {
try!(self.get_mut().close(Shutdown::Both));
Ok(())
}
}
impl Http11Message {
/// Consumes the `Http11Message` and returns the underlying `NetworkStream`.
pub fn into_inner(mut self) -> Box<NetworkStream + Send> {
if self.stream.is_some() {
self.stream.take().unwrap()
} else if self.writer.is_some() {
self.writer.take().unwrap().into_inner().into_inner().unwrap()
} else if self.reader.is_some() {
self.reader.take().unwrap().into_inner().into_inner()
} else {
panic!("Http11Message lost its underlying stream somehow");
}
}
/// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the
/// `Http11Message`.
pub fn get_mut(&mut self) -> &mut Box<NetworkStream + Send> {
if self.stream.is_some() {
self.stream.as_mut().unwrap()
} else if self.writer.is_some() {
self.writer.as_mut().unwrap().get_mut().get_mut()
} else if self.reader.is_some() {
self.reader.as_mut().unwrap().get_mut().get_mut()
} else {
panic!("Http11Message lost its underlying stream somehow");
}
}
/// Creates a new `Http11Message` that will use the given `NetworkStream` for communicating to
/// the peer.
pub fn with_stream(stream: Box<NetworkStream + Send>) -> Http11Message {
Http11Message {
stream: Some(stream),
writer: None,
reader: None,
}
}
/// Flushes the current outgoing content and moves the stream into the `stream` property.
///
/// TODO It might be sensible to lift this up to the `HttpMessage` trait itself...
pub fn flush_outgoing(&mut self) -> ::Result<()> {
match self.writer {
None => return Ok(()),
Some(_) => {},
};
let writer = self.writer.take().unwrap();
let raw = try!(writer.end()).into_inner().unwrap(); // end() already flushes
self.stream = Some(raw);
Ok(())
}
}
/// The `Protocol` implementation provides HTTP/1.1 messages.
pub struct Http11Protocol {
connector: Connector,
}
impl Protocol for Http11Protocol {
fn new_message(&self, host: &str, port: u16, scheme: &str) -> ::Result<Box<HttpMessage>> {
let stream = try!(self.connector.connect(host, port, scheme)).into();
Ok(Box::new(Http11Message::with_stream(stream)))
}
#[inline]
fn set_ssl_verifier(&mut self, verifier: ContextVerifier) {
self.connector.set_ssl_verifier(verifier);
}
}
impl Http11Protocol {
/// Creates a new `Http11Protocol` instance that will use the given `NetworkConnector` for
/// establishing HTTP connections.
pub fn with_connector<C, S>(c: C) -> Http11Protocol
where C: NetworkConnector<Stream=S> + Send + 'static,
S: NetworkStream + Send {
Http11Protocol {
connector: Connector(Box::new(ConnAdapter(c))),
}
}
}
struct ConnAdapter<C: NetworkConnector + Send>(C);
impl<C: NetworkConnector<Stream=S> + Send, S: NetworkStream + Send> NetworkConnector for ConnAdapter<C> {
type Stream = Box<NetworkStream + Send>;
#[inline]
fn connect(&self, host: &str, port: u16, scheme: &str)
-> ::Result<Box<NetworkStream + Send>> {
Ok(try!(self.0.connect(host, port, scheme)).into())
}
#[inline]
fn set_ssl_verifier(&mut self, verifier: ContextVerifier) {
self.0.set_ssl_verifier(verifier);
}
}
struct Connector(Box<NetworkConnector<Stream=Box<NetworkStream + Send>> + Send>);
impl NetworkConnector for Connector {
type Stream = Box<NetworkStream + Send>;
#[inline]
fn connect(&self, host: &str, port: u16, scheme: &str)
-> ::Result<Box<NetworkStream + Send>> {
Ok(try!(self.0.connect(host, port, scheme)).into())
}
#[inline]
fn set_ssl_verifier(&mut self, verifier: ContextVerifier) {
self.0.set_ssl_verifier(verifier);
}
}

784
src/http2.rs Normal file
View File

@@ -0,0 +1,784 @@
//! Adapts the `solicit`-provided HTTP/2 implementation into the `HttpMessage` API.
use std::io::{self, Write, Read, Cursor};
use std::net::Shutdown;
use std::ascii::AsciiExt;
use std::mem;
use message::{
Protocol,
HttpMessage,
RequestHead,
ResponseHead,
};
use net::{NetworkStream, NetworkConnector, ContextVerifier};
use net::{HttpConnector, HttpStream};
use url::Url;
use http::RawStatus;
use header::Headers;
use header;
use version;
use solicit::http::Header as Http2Header;
use solicit::http::HttpScheme;
use solicit::http::HttpError as Http2Error;
use solicit::http::transport::TransportStream;
use solicit::http::client::{ClientStream, HttpConnect, write_preface};
use solicit::client::SimpleClient;
use httparse;
/// A trait alias representing all types that are both `NetworkStream` and `Clone`.
pub trait CloneableStream: NetworkStream + Clone {}
impl<S: NetworkStream + Clone> CloneableStream for S {}
/// A newtype wrapping any `CloneableStream` in order to provide an implementation of a
/// `TransportSream` trait for all types that are a `CloneableStream`.
#[derive(Clone)]
struct Http2Stream<S: CloneableStream>(S);
impl<S> Write for Http2Stream<S> where S: CloneableStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
impl<S> Read for Http2Stream<S> where S: CloneableStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
}
impl<S> TransportStream for Http2Stream<S> where S: CloneableStream {
fn try_split(&self) -> Result<Http2Stream<S>, io::Error> {
Ok(self.clone())
}
fn close(&mut self) -> Result<(), io::Error> {
self.0.close(Shutdown::Both)
}
}
/// A helper struct that implements the `HttpConnect` trait from the `solicit` crate.
///
/// This is used by the `Http2Protocol` when it needs to create a new `SimpleClient`.
struct Http2Connector<S> where S: CloneableStream {
stream: S,
scheme: HttpScheme,
host: String,
}
impl<S> HttpConnect for Http2Connector<S> where S: CloneableStream {
/// The type of the underlying transport stream that the `HttpConnection`s
/// produced by this `HttpConnect` implementation will be based on.
type Stream = Http2Stream<S>;
/// The type of the error that can be produced by trying to establish the
/// connection (i.e. calling the `connect` method).
type Err = ::Error;
/// Establishes a network connection that can be used by HTTP/2 connections.
fn connect(mut self) -> Result<ClientStream<Self::Stream>, Self::Err> {
try!(write_preface(&mut self.stream));
Ok(ClientStream(Http2Stream(self.stream), self.scheme, self.host))
}
}
/// The `Protocol` implementation that provides HTTP/2 messages (i.e. `Http2Message`).
pub struct Http2Protocol<C, S> where C: NetworkConnector<Stream=S> + Send + 'static,
S: NetworkStream + Send + Clone {
connector: C,
}
impl<C, S> Http2Protocol<C, S> where C: NetworkConnector<Stream=S> + Send + 'static,
S: NetworkStream + Send + Clone {
/// Create a new `Http2Protocol` that will use the given `NetworkConnector` to establish TCP
/// connections to the server.
pub fn with_connector(connector: C) -> Http2Protocol<C, S> {
Http2Protocol {
connector: connector,
}
}
/// A private helper method that creates a new `SimpleClient` that will use the given
/// `NetworkStream` to communicate to the remote host.
fn new_client(&self, stream: S, host: String, scheme: HttpScheme)
-> ::Result<SimpleClient<Http2Stream<S>>> {
Ok(try!(SimpleClient::with_connector(Http2Connector {
stream: stream,
scheme: scheme,
host: host,
})))
}
}
impl<C, S> Protocol for Http2Protocol<C, S> where C: NetworkConnector<Stream=S> + Send + 'static,
S: NetworkStream + Send + Clone {
fn new_message(&self, host: &str, port: u16, scheme: &str) -> ::Result<Box<HttpMessage>> {
let stream = try!(self.connector.connect(host, port, scheme)).into();
let scheme = match scheme {
"http" => HttpScheme::Http,
"https" => HttpScheme::Https,
_ => return Err(From::from(Http2Error::from(
io::Error::new(io::ErrorKind::Other, "Invalid scheme")))),
};
let client = try!(self.new_client(stream, host.into(), scheme));
Ok(Box::new(Http2Message::with_client(client)))
}
#[inline]
fn set_ssl_verifier(&mut self, verifier: ContextVerifier) {
self.connector.set_ssl_verifier(verifier)
}
}
/// Represents an HTTP/2 request, described by a `RequestHead` and the body of the request.
/// A convenience struct only in use by the `Http2Message`.
#[derive(Clone, Debug)]
struct Http2Request {
head: RequestHead,
body: Vec<u8>,
}
/// Represents an HTTP/2 response.
/// A convenience struct only in use by the `Http2Message`.
#[derive(Clone, Debug)]
struct Http2Response {
body: Cursor<Vec<u8>>,
}
/// The enum tracks the state of the `Http2Message`.
enum MessageState {
/// State corresponding to no message being set to outgoing yet.
Idle,
/// State corresponding to an outgoing message being written out.
Writing(Http2Request),
/// State corresponding to an incoming message being read.
Reading(Http2Response),
}
impl MessageState {
fn take_request(&mut self) -> Option<Http2Request> {
match *self {
MessageState::Idle | MessageState::Reading(_) => return None,
MessageState::Writing(_) => {},
}
let old = mem::replace(self, MessageState::Idle);
match old {
// These states are effectively unreachable since we already know the state
MessageState::Idle | MessageState::Reading(_) => None,
MessageState::Writing(req) => Some(req),
}
}
}
/// An implementation of the `HttpMessage` trait for HTTP/2.
///
/// Relies on the `solicit::http::SimpleClient` for HTTP/2 communication. Adapts both outgoing and
/// incoming messages to the API that `hyper` expects in order to be able to use the message in
/// the `hyper::client` module.
pub struct Http2Message<S> where S: CloneableStream {
client: SimpleClient<Http2Stream<S>>,
state: MessageState,
}
impl<S> ::std::fmt::Debug for Http2Message<S> where S: CloneableStream {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> {
write!(f, "<Http2Message>")
}
}
impl<S> Http2Message<S> where S: CloneableStream {
/// Helper method that creates a new completely fresh `Http2Message`, which will use the given
/// `SimpleClient` for its HTTP/2 communication.
fn with_client(client: SimpleClient<Http2Stream<S>>) -> Http2Message<S> {
Http2Message {
client: client,
state: MessageState::Idle,
}
}
}
impl<S> Write for Http2Message<S> where S: CloneableStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let MessageState::Writing(ref mut req) = self.state {
req.body.write(buf)
} else {
Err(io::Error::new(io::ErrorKind::Other,
"Not in a writable state"))
}
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
if let MessageState::Writing(ref mut req) = self.state {
req.body.flush()
} else {
Err(io::Error::new(io::ErrorKind::Other,
"Not in a writable state"))
}
}
}
impl<S> Read for Http2Message<S> where S: CloneableStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let MessageState::Reading(ref mut res) = self.state {
res.body.read(buf)
} else {
Err(io::Error::new(io::ErrorKind::Other,
"Not in a readable state"))
}
}
}
/// A helper function that prepares the path of a request by extracting it from the given `Url`.
fn prepare_path(url: Url) -> Vec<u8> {
let mut uri = url.serialize_path().unwrap();
if let Some(ref q) = url.query {
uri.push('?');
uri.push_str(&q[..]);
}
uri.into_bytes()
}
/// A helper function that prepares the headers that should be sent in an HTTP/2 message.
///
/// Adapts the `Headers` into a list of octet string pairs.
fn prepare_headers(mut headers: Headers) -> Vec<Http2Header> {
if headers.remove::<header::Connection>() {
warn!("The `Connection` header is not valid for an HTTP/2 connection.");
}
let mut http2_headers: Vec<_> = headers.iter().filter_map(|h| {
if h.is::<header::SetCookie>() {
None
} else {
// HTTP/2 header names MUST be lowercase.
Some((h.name().to_ascii_lowercase().into_bytes(), h.value_string().into_bytes()))
}
}).collect();
// Now separately add the cookies, as `hyper` considers `Set-Cookie` to be only a single
// header, even in the face of multiple cookies being set.
if let Some(set_cookie) = headers.get::<header::SetCookie>() {
for cookie in set_cookie.iter() {
http2_headers.push((b"set-cookie".to_vec(), cookie.to_string().into_bytes()));
}
}
http2_headers
}
/// A helper function that prepares the body for sending in an HTTP/2 request.
#[inline]
fn prepare_body(body: Vec<u8>) -> Option<Vec<u8>> {
if body.len() == 0 {
None
} else {
Some(body)
}
}
/// Parses a set of HTTP/2 headers into a `hyper::header::Headers` struct.
fn parse_headers(http2_headers: Vec<Http2Header>) -> ::Result<Headers> {
// Adapt the header name from `Vec<u8>` to `String`, without making any copies.
let mut headers = Vec::new();
for (name, value) in http2_headers.into_iter() {
let name = match String::from_utf8(name) {
Ok(name) => name,
Err(_) => return Err(From::from(Http2Error::MalformedResponse)),
};
headers.push((name, value));
}
let mut raw_headers = Vec::new();
for &(ref name, ref value) in headers.iter() {
raw_headers.push(httparse::Header { name: &name, value: &value });
}
Headers::from_raw(&raw_headers)
}
/// Parses the response, as returned by `solicit`, into a `ResponseHead` and the full response
/// body.
///
/// Returns them as a two-tuple.
fn parse_response(response: ::solicit::http::Response) -> ::Result<(ResponseHead, Vec<u8>)> {
let status = try!(response.status_code());
let headers = try!(parse_headers(response.headers));
Ok((ResponseHead {
headers: headers,
raw_status: RawStatus(status, "".into()),
version: version::HttpVersion::Http20,
}, response.body))
}
impl<S> HttpMessage for Http2Message<S> where S: CloneableStream {
fn set_outgoing(&mut self, head: RequestHead) -> ::Result<RequestHead> {
match self.state {
MessageState::Writing(_) | MessageState::Reading(_) => {
return Err(From::from(Http2Error::from(
io::Error::new(io::ErrorKind::Other,
"An outoging has already been set"))));
},
MessageState::Idle => {},
};
self.state = MessageState::Writing(Http2Request {
head: head.clone(),
body: Vec::new(),
});
Ok(head)
}
fn get_incoming(&mut self) -> ::Result<ResponseHead> {
// Prepare the request so that it can be passed off to the HTTP/2 client.
let request = match self.state.take_request() {
None => {
return Err(From::from(Http2Error::from(
io::Error::new(io::ErrorKind::Other,
"No request in progress"))));
},
Some(req) => req,
};
let (RequestHead { headers, method, url }, body) = (request.head, request.body);
let method = method.as_ref().as_bytes();
let path = prepare_path(url);
let extra_headers = prepare_headers(headers);
let body = prepare_body(body);
// Finally, everything is ready and we issue the request.
let stream_id = try!(self.client.request(method, &path, &extra_headers, body));
// Wait for the response
let resp = try!(self.client.get_response(stream_id));
// Now that the response is back, adapt it to the structs that hyper expects/provides.
let (head, body) = try!(parse_response(resp));
// For now, since `solicit` has already read the full response, we just wrap the body into
// a `Cursor` to allow for the public interface to support `io::Read`.
let body = Cursor::new(body);
// The body is saved so that it can be read out from the message.
self.state = MessageState::Reading(Http2Response {
body: body,
});
Ok(head)
}
fn close_connection(&mut self) -> ::Result<()> {
Ok(())
}
}
/// A convenience method that creates a default `Http2Protocol` that uses a `net::HttpConnector`
/// (which produces an `HttpStream` for the underlying transport layer).
#[inline]
pub fn new_protocol() -> Http2Protocol<HttpConnector, HttpStream> {
Http2Protocol::with_connector(HttpConnector(None))
}
#[cfg(test)]
mod tests {
use super::{Http2Protocol, prepare_headers, parse_headers, parse_response};
use std::io::{Read};
use mock::{MockHttp2Connector, MockStream};
use message::{RequestHead, ResponseHead, Protocol};
use header::Headers;
use header;
use url::Url;
use method;
use cookie;
use version;
use solicit::http::connection::{HttpFrame, ReceiveFrame};
/// Tests that the `Http2Message` correctly reads a response with no body.
#[test]
fn test_http2_response_no_body() {
let mut mock_connector = MockHttp2Connector::new();
mock_connector.new_response_stream(b"200", &Headers::new(), None);
let protocol = Http2Protocol::with_connector(mock_connector);
let mut message = protocol.new_message("127.0.0.1", 1337, "http").unwrap();
message.set_outgoing(RequestHead {
headers: Headers::new(),
method: method::Method::Get,
url: Url::parse("http://127.0.0.1/hello").unwrap(),
}).unwrap();
let resp = message.get_incoming().unwrap();
assert_eq!(resp.raw_status.0, 200);
let mut body = Vec::new();
message.read_to_end(&mut body).unwrap();
assert_eq!(body.len(), 0);
}
/// Tests that the `Http2Message` correctly reads a response with a body.
#[test]
fn test_http2_response_with_body() {
let mut mock_connector = MockHttp2Connector::new();
mock_connector.new_response_stream(b"200", &Headers::new(), Some(vec![1, 2, 3]));
let protocol = Http2Protocol::with_connector(mock_connector);
let mut message = protocol.new_message("127.0.0.1", 1337, "http").unwrap();
message.set_outgoing(RequestHead {
headers: Headers::new(),
method: method::Method::Get,
url: Url::parse("http://127.0.0.1/hello").unwrap(),
}).unwrap();
let resp = message.get_incoming().unwrap();
assert_eq!(resp.raw_status.0, 200);
let mut body = Vec::new();
message.read_to_end(&mut body).unwrap();
assert_eq!(vec![1, 2, 3], body);
}
/// Tests that the `Http2Message` correctly reads a response with an empty body.
#[test]
fn test_http2_response_empty_body() {
let mut mock_connector = MockHttp2Connector::new();
mock_connector.new_response_stream(b"200", &Headers::new(), Some(vec![]));
let protocol = Http2Protocol::with_connector(mock_connector);
let mut message = protocol.new_message("127.0.0.1", 1337, "http").unwrap();
message.set_outgoing(RequestHead {
headers: Headers::new(),
method: method::Method::Get,
url: Url::parse("http://127.0.0.1/hello").unwrap(),
}).unwrap();
let resp = message.get_incoming().unwrap();
assert_eq!(resp.raw_status.0, 200);
let mut body = Vec::new();
message.read_to_end(&mut body).unwrap();
assert_eq!(Vec::<u8>::new(), body);
}
/// Tests that the `Http2Message` correctly parses out the headers into the `ResponseHead`.
#[test]
fn test_http2_response_headers() {
let mut mock_connector = MockHttp2Connector::new();
let mut headers = Headers::new();
headers.set(header::ContentLength(3));
headers.set(header::ETag(header::EntityTag::new(true, "tag".into())));
mock_connector.new_response_stream(b"200", &headers, Some(vec![1, 2, 3]));
let protocol = Http2Protocol::with_connector(mock_connector);
let mut message = protocol.new_message("127.0.0.1", 1337, "http").unwrap();
message.set_outgoing(RequestHead {
headers: Headers::new(),
method: method::Method::Get,
url: Url::parse("http://127.0.0.1/hello").unwrap(),
}).unwrap();
let resp = message.get_incoming().unwrap();
assert_eq!(resp.raw_status.0, 200);
assert!(resp.headers.has::<header::ContentLength>());
let &header::ContentLength(len) = resp.headers.get::<header::ContentLength>().unwrap();
assert_eq!(3, len);
assert!(resp.headers.has::<header::ETag>());
let &header::ETag(ref tag) = resp.headers.get::<header::ETag>().unwrap();
assert_eq!(tag.tag(), "tag");
}
/// Tests that an error is returned when the `Http2Message` is not in a readable state.
#[test]
fn test_http2_message_not_readable() {
let mut mock_connector = MockHttp2Connector::new();
mock_connector.new_response_stream(b"200", &Headers::new(), None);
let protocol = Http2Protocol::with_connector(mock_connector);
let mut message = protocol.new_message("127.0.0.1", 1337, "http").unwrap();
// No outgoing set yet, so nothing can be read at this point.
assert!(message.read(&mut [0; 5]).is_err());
}
/// Tests that an error is returned when the `Http2Message` is not in a writable state.
#[test]
fn test_http2_message_not_writable() {
let mut mock_connector = MockHttp2Connector::new();
mock_connector.new_response_stream(b"200", &Headers::new(), None);
let protocol = Http2Protocol::with_connector(mock_connector);
let mut message = protocol.new_message("127.0.0.1", 1337, "http").unwrap();
message.set_outgoing(RequestHead {
headers: Headers::new(),
method: method::Method::Get,
url: Url::parse("http://127.0.0.1/hello").unwrap(),
}).unwrap();
let _ = message.get_incoming().unwrap();
// Writes are invalid now
assert!(message.write(&[1]).is_err());
}
/// Asserts that the given stream contains the full expected client preface: the preface bytes,
/// settings frame, and settings ack frame.
fn assert_client_preface(server_stream: &mut MockStream) {
// Skip client preface
server_stream.read(&mut [0; 24]).unwrap();
// The first frame are the settings
assert!(match server_stream.recv_frame().unwrap() {
HttpFrame::SettingsFrame(_) => true,
_ => false,
});
// Now the ACK to the server's settings.
assert!(match server_stream.recv_frame().unwrap() {
HttpFrame::SettingsFrame(_) => true,
_ => false,
});
}
/// Tests that sending a request with no body works correctly.
#[test]
fn test_http2_request_no_body() {
let mut mock_connector = MockHttp2Connector::new();
let stream = mock_connector.new_response_stream(b"200", &Headers::new(), Some(vec![]));
let protocol = Http2Protocol::with_connector(mock_connector);
let mut message = protocol.new_message("127.0.0.1", 1337, "http").unwrap();
message.set_outgoing(RequestHead {
headers: Headers::new(),
method: method::Method::Get,
url: Url::parse("http://127.0.0.1/hello").unwrap(),
}).unwrap();
let _ = message.get_incoming().unwrap();
let stream = stream.inner.lock().unwrap();
assert!(stream.write.len() > 0);
// The output stream of the client side gets flipped so that we can read the stream from
// the server's end.
let mut server_stream = MockStream::with_input(&stream.write);
assert_client_preface(&mut server_stream);
let frame = server_stream.recv_frame().unwrap();
assert!(match frame {
HttpFrame::HeadersFrame(ref frame) => frame.is_end_of_stream(),
_ => false,
});
}
/// Tests that sending a request with a body works correctly.
#[test]
fn test_http2_request_with_body() {
let mut mock_connector = MockHttp2Connector::new();
let stream = mock_connector.new_response_stream(b"200", &Headers::new(), None);
let protocol = Http2Protocol::with_connector(mock_connector);
let mut message = protocol.new_message("127.0.0.1", 1337, "http").unwrap();
message.set_outgoing(RequestHead {
headers: Headers::new(),
method: method::Method::Get,
url: Url::parse("http://127.0.0.1/hello").unwrap(),
}).unwrap();
// Write a few things to the request in multiple writes.
message.write(&[1]).unwrap();
message.write(&[2, 3]).unwrap();
let _ = message.get_incoming().unwrap();
let stream = stream.inner.lock().unwrap();
assert!(stream.write.len() > 0);
// The output stream of the client side gets flipped so that we can read the stream from
// the server's end.
let mut server_stream = MockStream::with_input(&stream.write);
assert_client_preface(&mut server_stream);
let frame = server_stream.recv_frame().unwrap();
assert!(match frame {
HttpFrame::HeadersFrame(ref frame) => !frame.is_end_of_stream(),
_ => false,
});
assert!(match server_stream.recv_frame().unwrap() {
HttpFrame::DataFrame(ref frame) => frame.data == vec![1, 2, 3],
_ => false,
});
}
/// Tests that headers are correctly prepared when they include a `Set-Cookie` header.
#[test]
fn test_http2_prepare_headers_with_set_cookie() {
let cookies = header::SetCookie(vec![
cookie::Cookie::new("foo".to_owned(), "bar".to_owned()),
cookie::Cookie::new("baz".to_owned(), "quux".to_owned())
]);
let mut headers = Headers::new();
headers.set(cookies);
let h2headers = prepare_headers(headers);
assert_eq!(vec![
(b"set-cookie".to_vec(), b"foo=bar; Path=/".to_vec()),
(b"set-cookie".to_vec(), b"baz=quux; Path=/".to_vec()),
], h2headers);
}
/// Tests that headers are correctly prepared when they include a `Cookie` header.
#[test]
fn test_http2_prepapre_headers_with_cookie() {
let cookies = header::Cookie(vec![
cookie::Cookie::new("foo".to_owned(), "bar".to_owned()),
cookie::Cookie::new("baz".to_owned(), "quux".to_owned())
]);
let mut headers = Headers::new();
headers.set(cookies);
let h2headers = prepare_headers(headers);
assert_eq!(vec![
(b"cookie".to_vec(), b"foo=bar; baz=quux".to_vec()),
], h2headers);
}
/// Tests that HTTP/2 headers are correctly prepared.
#[test]
fn test_http2_prepare_headers() {
let mut headers = Headers::new();
headers.set(header::ContentLength(3));
let expected = vec![
(b"content-length".to_vec(), b"3".to_vec()),
];
assert_eq!(expected, prepare_headers(headers));
}
/// Tests that the headers of a response are correctly parsed when they include a `Set-Cookie`
/// header.
#[test]
fn test_http2_parse_headers_with_set_cookie() {
let h2headers = vec![
(b"set-cookie".to_vec(), b"foo=bar; Path=/".to_vec()),
(b"set-cookie".to_vec(), b"baz=quux; Path=/".to_vec()),
];
let expected = header::SetCookie(vec![
cookie::Cookie::new("foo".to_owned(), "bar".to_owned()),
cookie::Cookie::new("baz".to_owned(), "quux".to_owned())
]);
let headers = parse_headers(h2headers).unwrap();
assert!(headers.has::<header::SetCookie>());
let set_cookie = headers.get::<header::SetCookie>().unwrap();
assert_eq!(expected, *set_cookie);
}
/// Tests that parsing HTTP/2 headers with `Cookie` headers works correctly.
#[test]
fn test_http2_parse_headers_with_cookie() {
let expected = header::Cookie(vec![
cookie::Cookie::new("foo".to_owned(), "bar".to_owned()),
cookie::Cookie::new("baz".to_owned(), "quux".to_owned())
]);
// HTTP/2 allows the `Cookie` header to be split into multiple ones to facilitate better
// compression.
let h2headers = vec![
(b"cookie".to_vec(), b"foo=bar".to_vec()),
(b"cookie".to_vec(), b"baz=quux".to_vec()),
];
let headers = parse_headers(h2headers).unwrap();
assert!(headers.has::<header::Cookie>());
assert_eq!(*headers.get::<header::Cookie>().unwrap(), expected);
}
/// Tests that the headers of a response are correctly parsed.
#[test]
fn test_http2_parse_headers() {
let h2headers = vec![
(b":status".to_vec(), b"200".to_vec()),
(b"content-length".to_vec(), b"3".to_vec()),
];
let headers = parse_headers(h2headers).unwrap();
assert!(headers.has::<header::ContentLength>());
let &header::ContentLength(len) = headers.get::<header::ContentLength>().unwrap();
assert_eq!(3, len);
}
/// Tests that if a header name is not a valid utf8 byte sequence, an error is returned.
#[test]
fn test_http2_parse_headers_invalid_name() {
let h2headers = vec![
(vec![0xfe], vec![]),
];
assert!(parse_headers(h2headers).is_err());
}
/// Tests that a response with no pseudo-header for status is considered invalid.
#[test]
fn test_http2_parse_response_no_status_code() {
let response = ::solicit::http::Response {
body: Vec::new(),
headers: vec![
(b"content-length".to_vec(), b"3".to_vec()),
],
stream_id: 1,
};
assert!(parse_response(response).is_err());
}
/// Tests that an HTTP/2 response gets correctly parsed into a body and response head, when
/// the body is empty.
#[test]
fn test_http2_parse_response_no_body() {
let response = ::solicit::http::Response {
body: Vec::new(),
headers: vec![
(b":status".to_vec(), b"200".to_vec()),
(b"content-length".to_vec(), b"0".to_vec()),
],
stream_id: 1,
};
let (head, body) = parse_response(response).unwrap();
assert_eq!(body, vec![]);
let ResponseHead { headers, raw_status, version } = head;
assert_eq!(raw_status.0, 200);
assert_eq!(raw_status.1, "");
assert!(headers.has::<header::ContentLength>());
assert_eq!(version, version::HttpVersion::Http20);
}
/// Tests that an HTTP/2 response gets correctly parsed into a body and response head, when
/// the body is not empty.
#[test]
fn test_http2_parse_response_with_body() {
let expected_body = vec![1, 2, 3];
let response = ::solicit::http::Response {
body: expected_body.clone(),
headers: vec![
(b":status".to_vec(), b"200".to_vec()),
(b"content-length".to_vec(), b"3".to_vec()),
],
stream_id: 1,
};
let (head, body) = parse_response(response).unwrap();
assert_eq!(body, expected_body);
let ResponseHead { headers, raw_status, version } = head;
assert_eq!(raw_status.0, 200);
assert_eq!(raw_status.1, "");
assert!(headers.has::<header::ContentLength>());
assert_eq!(version, version::HttpVersion::Http20);
}
}

View File

@@ -138,6 +138,7 @@ extern crate httparse;
extern crate num_cpus;
extern crate traitobject;
extern crate typeable;
extern crate solicit;
#[macro_use]
extern crate mime as mime_crate;
@@ -185,6 +186,9 @@ pub mod server;
pub mod status;
pub mod uri;
pub mod version;
pub mod message;
pub mod http11;
pub mod http2;
/// Re-exporting the mime crate, for convenience.

124
src/message.rs Normal file
View File

@@ -0,0 +1,124 @@
//! Defines the `HttpMessage` trait that serves to encapsulate the operations of a single
//! request-response cycle on any HTTP connection.
use std::fmt::Debug;
use std::any::{Any, TypeId};
use std::io::{Read, Write};
use std::mem;
use typeable::Typeable;
use header::Headers;
use http::RawStatus;
use url::Url;
use method;
use version;
use traitobject;
use net::ContextVerifier;
/// The trait provides an API for creating new `HttpMessage`s depending on the underlying HTTP
/// protocol.
pub trait Protocol {
/// Creates a fresh `HttpMessage` bound to the given host, based on the given protocol scheme.
fn new_message(&self, host: &str, port: u16, scheme: &str) -> ::Result<Box<HttpMessage>>;
/// Sets the SSL verifier that should be used when establishing TLS-protected connections.
fn set_ssl_verifier(&mut self, verifier: ContextVerifier);
}
/// Describes a request.
#[derive(Clone, Debug)]
pub struct RequestHead {
/// The headers of the request
pub headers: Headers,
/// The method of the request
pub method: method::Method,
/// The URL of the request
pub url: Url,
}
/// Describes a response.
#[derive(Clone, Debug)]
pub struct ResponseHead {
/// The headers of the reponse
pub headers: Headers,
/// The raw status line of the response
pub raw_status: RawStatus,
/// The HTTP/2 version which generated the response
pub version: version::HttpVersion,
}
/// The trait provides an API for sending an receiving HTTP messages.
pub trait HttpMessage: Write + Read + Send + Any + Typeable + Debug {
/// Initiates a new outgoing request.
///
/// Only the request's head is provided (in terms of the `RequestHead` struct).
///
/// After this, the `HttpMessage` instance can be used as an `io::Write` in order to write the
/// body of the request.
fn set_outgoing(&mut self, head: RequestHead) -> ::Result<RequestHead>;
/// Obtains the incoming response and returns its head (i.e. the `ResponseHead` struct)
///
/// After this, the `HttpMessage` instance can be used as an `io::Read` in order to read out
/// the response body.
fn get_incoming(&mut self) -> ::Result<ResponseHead>;
/// Closes the underlying HTTP connection.
fn close_connection(&mut self) -> ::Result<()>;
}
impl HttpMessage {
unsafe fn downcast_ref_unchecked<T: 'static>(&self) -> &T {
mem::transmute(traitobject::data(self))
}
unsafe fn downcast_mut_unchecked<T: 'static>(&mut self) -> &mut T {
mem::transmute(traitobject::data_mut(self))
}
unsafe fn downcast_unchecked<T: 'static>(self: Box<HttpMessage>) -> Box<T> {
let raw: *mut HttpMessage = mem::transmute(self);
mem::transmute(traitobject::data_mut(raw))
}
}
impl HttpMessage {
/// Is the underlying type in this trait object a T?
#[inline]
pub fn is<T: Any>(&self) -> bool {
(*self).get_type() == TypeId::of::<T>()
}
/// If the underlying type is T, get a reference to the contained data.
#[inline]
pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
if self.is::<T>() {
Some(unsafe { self.downcast_ref_unchecked() })
} else {
None
}
}
/// If the underlying type is T, get a mutable reference to the contained
/// data.
#[inline]
pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
if self.is::<T>() {
Some(unsafe { self.downcast_mut_unchecked() })
} else {
None
}
}
/// If the underlying type is T, extract it.
#[inline]
pub fn downcast<T: Any>(self: Box<HttpMessage>)
-> Result<Box<T>, Box<HttpMessage>> {
if self.is::<T>() {
Ok(unsafe { self.downcast_unchecked() })
} else {
Err(self)
}
}
}

View File

@@ -1,8 +1,17 @@
use std::fmt;
use std::ascii::AsciiExt;
use std::io::{self, Read, Write, Cursor};
use std::cell::RefCell;
use std::net::SocketAddr;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use solicit::http::HttpScheme;
use solicit::http::transport::TransportStream;
use solicit::http::frame::{SettingsFrame, Frame};
use solicit::http::connection::{HttpConnection, EndStream, DataChunk};
use header::Headers;
use net::{NetworkStream, NetworkConnector, ContextVerifier};
pub struct MockStream {
@@ -69,6 +78,53 @@ impl NetworkStream for MockStream {
}
}
/// A wrapper around a `MockStream` that allows one to clone it and keep an independent copy to the
/// same underlying stream.
#[derive(Clone)]
pub struct CloneableMockStream {
pub inner: Arc<Mutex<MockStream>>,
}
impl Write for CloneableMockStream {
fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
self.inner.lock().unwrap().write(msg)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.lock().unwrap().flush()
}
}
impl Read for CloneableMockStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.lock().unwrap().read(buf)
}
}
impl TransportStream for CloneableMockStream {
fn try_split(&self) -> Result<CloneableMockStream, io::Error> {
Ok(self.clone())
}
fn close(&mut self) -> Result<(), io::Error> {
Ok(())
}
}
impl NetworkStream for CloneableMockStream {
fn peer_addr(&mut self) -> io::Result<SocketAddr> {
self.inner.lock().unwrap().peer_addr()
}
}
impl CloneableMockStream {
pub fn with_stream(stream: MockStream) -> CloneableMockStream {
CloneableMockStream {
inner: Arc::new(Mutex::new(stream)),
}
}
}
pub struct MockConnector;
impl NetworkConnector for MockConnector {
@@ -149,3 +205,100 @@ macro_rules! mock_connector (
)
);
impl TransportStream for MockStream {
fn try_split(&self) -> Result<MockStream, io::Error> {
Ok(self.clone())
}
fn close(&mut self) -> Result<(), io::Error> {
Ok(())
}
}
impl MockStream {
/// Creates a new `MockStream` that will return the response described by the parameters as an
/// HTTP/2 response. This will also include the correct server preface.
pub fn new_http2_response(status: &[u8], headers: &Headers, body: Option<Vec<u8>>)
-> MockStream {
let resp_bytes = build_http2_response(status, headers, body);
MockStream::with_input(&resp_bytes)
}
}
/// Builds up a sequence of bytes that represent a server's response based on the given parameters.
pub fn build_http2_response(status: &[u8], headers: &Headers, body: Option<Vec<u8>>) -> Vec<u8> {
let mut conn = HttpConnection::new(MockStream::new(), MockStream::new(), HttpScheme::Http);
// Server preface first
conn.sender.write(&SettingsFrame::new().serialize()).unwrap();
let mut resp_headers: Vec<_> = headers.iter().map(|h| {
(h.name().to_ascii_lowercase().into_bytes(), h.value_string().into_bytes())
}).collect();
resp_headers.insert(0, (b":status".to_vec(), status.into()));
let end = if body.is_none() {
EndStream::Yes
} else {
EndStream::No
};
conn.send_headers(resp_headers, 1, end).unwrap();
if body.is_some() {
let chunk = DataChunk::new_borrowed(&body.as_ref().unwrap()[..], 1, EndStream::Yes);
conn.send_data(chunk).unwrap();
}
conn.sender.write
}
/// A mock connector that produces `MockStream`s that are set to return HTTP/2 responses.
///
/// This means that the streams' payloads are fairly opaque byte sequences (as HTTP/2 is a binary
/// protocol), which can be understood only be HTTP/2 clients.
pub struct MockHttp2Connector {
/// The list of streams that the connector returns, in the given order.
pub streams: RefCell<Vec<CloneableMockStream>>,
}
impl MockHttp2Connector {
/// Creates a new `MockHttp2Connector` with no streams.
pub fn new() -> MockHttp2Connector {
MockHttp2Connector {
streams: RefCell::new(Vec::new()),
}
}
/// Adds a new `CloneableMockStream` to the end of the connector's stream queue.
///
/// Streams are returned in a FIFO manner.
pub fn add_stream(&mut self, stream: CloneableMockStream) {
self.streams.borrow_mut().push(stream);
}
/// Adds a new response stream that will be placed to the end of the connector's stream queue.
///
/// Returns a separate `CloneableMockStream` that allows the user to inspect what is written
/// into the original stream.
pub fn new_response_stream(&mut self, status: &[u8], headers: &Headers, body: Option<Vec<u8>>)
-> CloneableMockStream {
let stream = MockStream::new_http2_response(status, headers, body);
let stream = CloneableMockStream::with_stream(stream);
let ret = stream.clone();
self.add_stream(stream);
ret
}
}
impl NetworkConnector for MockHttp2Connector {
type Stream = CloneableMockStream;
#[inline]
fn connect(&self, _host: &str, _port: u16, _scheme: &str)
-> ::Result<CloneableMockStream> {
Ok(self.streams.borrow_mut().remove(0))
}
#[inline]
fn set_ssl_verifier(&mut self, _verifier: ContextVerifier) {
// pass
}
}