refactor(client): use HttpMessage in Request and Response

BREAKING CHANGE: `hyper::client::request::Response` is no longer generic
over `NetworkStream` types. It no longer requires a generic type
parameter at all.
This commit is contained in:
Marko Lalic
2015-05-25 22:18:26 +02:00
parent ecb713f849
commit aa297f4532
2 changed files with 70 additions and 148 deletions

View File

@@ -1,18 +1,19 @@
//! Client Requests //! Client Requests
use std::marker::PhantomData; use std::marker::PhantomData;
use std::io::{self, Write, BufWriter}; use std::io::{self, Write};
use url::Url; use url::Url;
use method::{self, Method}; use method::{self, Method};
use header::Headers; use header::Headers;
use header::{self, Host}; use header::Host;
use net::{NetworkStream, NetworkConnector, HttpConnector, Fresh, Streaming}; use net::{NetworkStream, NetworkConnector, HttpConnector, Fresh, Streaming};
use http::{HttpWriter, LINE_ENDING};
use http::HttpWriter::{ThroughWriter, ChunkedWriter, SizedWriter, EmptyWriter};
use version; use version;
use client::{Response, get_host_and_port}; use client::{Response, get_host_and_port};
use message::{HttpMessage, RequestHead};
use http11::Http11Message;
/// A client request to a remote server. /// A client request to a remote server.
/// The W type tracks the state of the request, Fresh vs Streaming. /// The W type tracks the state of the request, Fresh vs Streaming.
@@ -23,7 +24,7 @@ pub struct Request<W> {
/// The HTTP version of this request. /// The HTTP version of this request.
pub version: version::HttpVersion, pub version: version::HttpVersion,
body: HttpWriter<BufWriter<Box<NetworkStream + Send>>>, message: Box<HttpMessage>,
headers: Headers, headers: Headers,
method: method::Method, method: method::Method,
@@ -41,6 +42,28 @@ impl<W> Request<W> {
} }
impl Request<Fresh> { impl Request<Fresh> {
/// Create a new `Request<Fresh>` that will use the given `HttpMessage` for its communication
/// with the server. This implies that the given `HttpMessage` instance has already been
/// properly initialized by the caller (e.g. a TCP connection's already established).
pub fn with_message(method: method::Method, url: Url, message: Box<HttpMessage>)
-> ::Result<Request<Fresh>> {
let (host, port) = try!(get_host_and_port(&url));
let mut headers = Headers::new();
headers.set(Host {
hostname: host,
port: Some(port),
});
Ok(Request {
method: method,
headers: headers,
url: url,
version: version::HttpVersion::Http11,
message: message,
_marker: PhantomData,
})
}
/// Create a new client request. /// Create a new client request.
pub fn new(method: method::Method, url: Url) -> ::Result<Request<Fresh>> { pub fn new(method: method::Method, url: Url) -> ::Result<Request<Fresh>> {
let mut conn = HttpConnector(None); let mut conn = HttpConnector(None);
@@ -53,92 +76,26 @@ impl Request<Fresh> {
C: NetworkConnector<Stream=S>, C: NetworkConnector<Stream=S>,
S: Into<Box<NetworkStream + Send>> { S: Into<Box<NetworkStream + Send>> {
let (host, port) = try!(get_host_and_port(&url)); let (host, port) = try!(get_host_and_port(&url));
let stream = try!(connector.connect(&*host, port, &*url.scheme)).into(); let stream = try!(connector.connect(&*host, port, &*url.scheme)).into();
let stream = ThroughWriter(BufWriter::new(stream));
let mut headers = Headers::new(); Request::with_message(method, url, Box::new(Http11Message::with_stream(stream)))
headers.set(Host {
hostname: host,
port: Some(port),
});
Ok(Request {
method: method,
headers: headers,
url: url,
version: version::HttpVersion::Http11,
body: stream,
_marker: PhantomData,
})
} }
/// Consume a Fresh Request, writing the headers and method, /// Consume a Fresh Request, writing the headers and method,
/// returning a Streaming Request. /// returning a Streaming Request.
pub fn start(mut self) -> ::Result<Request<Streaming>> { pub fn start(mut self) -> ::Result<Request<Streaming>> {
let mut uri = self.url.serialize_path().unwrap(); let head = try!(self.message.set_outgoing(RequestHead {
if let Some(ref q) = self.url.query { headers: self.headers,
uri.push('?'); method: self.method,
uri.push_str(&q[..]); url: self.url,
} }));
debug!("request line: {:?} {:?} {:?}", self.method, uri, self.version);
try!(write!(&mut self.body, "{} {} {}{}",
self.method, uri, self.version, LINE_ENDING));
let stream = match self.method {
Method::Get | Method::Head => {
debug!("headers={:?}", self.headers);
try!(write!(&mut self.body, "{}{}", self.headers, LINE_ENDING));
EmptyWriter(self.body.into_inner())
},
_ => {
let mut chunked = true;
let mut len = 0;
match self.headers.get::<header::ContentLength>() {
Some(cl) => {
chunked = false;
len = **cl;
},
None => ()
};
// can't do in match above, thanks borrowck
if chunked {
let encodings = match self.headers.get_mut::<header::TransferEncoding>() {
Some(&mut header::TransferEncoding(ref mut encodings)) => {
//TODO: check if chunked is already in encodings. use HashSet?
encodings.push(header::Encoding::Chunked);
false
},
None => true
};
if encodings {
self.headers.set::<header::TransferEncoding>(
header::TransferEncoding(vec![header::Encoding::Chunked]))
}
}
debug!("headers={:?}", self.headers);
try!(write!(&mut self.body, "{}{}", self.headers, LINE_ENDING));
if chunked {
ChunkedWriter(self.body.into_inner())
} else {
SizedWriter(self.body.into_inner(), len)
}
}
};
Ok(Request { Ok(Request {
method: self.method, method: head.method,
headers: self.headers, headers: head.headers,
url: self.url, url: head.url,
version: self.version, version: self.version,
body: stream, message: self.message,
_marker: PhantomData, _marker: PhantomData,
}) })
} }
@@ -153,20 +110,19 @@ impl Request<Streaming> {
/// ///
/// Consumes the Request. /// Consumes the Request.
pub fn send(self) -> ::Result<Response> { pub fn send(self) -> ::Result<Response> {
let raw = try!(self.body.end()).into_inner().unwrap(); // end() already flushes Response::with_message(self.message)
Response::new(raw)
} }
} }
impl Write for Request<Streaming> { impl Write for Request<Streaming> {
#[inline] #[inline]
fn write(&mut self, msg: &[u8]) -> io::Result<usize> { fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
self.body.write(msg) self.message.write(msg)
} }
#[inline] #[inline]
fn flush(&mut self) -> io::Result<()> { fn flush(&mut self) -> io::Result<()> {
self.body.flush() self.message.flush()
} }
} }
@@ -180,11 +136,15 @@ mod tests {
use header::{ContentLength,TransferEncoding,Encoding}; use header::{ContentLength,TransferEncoding,Encoding};
use url::form_urlencoded; use url::form_urlencoded;
use super::Request; use super::Request;
use http11::Http11Message;
fn run_request(req: Request<Fresh>) -> Vec<u8> { fn run_request(req: Request<Fresh>) -> Vec<u8> {
let req = req.start().unwrap(); let req = req.start().unwrap();
let stream = *req.body.end().unwrap() let message = req.message;
.into_inner().unwrap().downcast::<MockStream>().ok().unwrap(); let mut message = message.downcast::<Http11Message>().ok().unwrap();
message.flush_outgoing().unwrap();
let stream = *message
.into_inner().downcast::<MockStream>().ok().unwrap();
stream.write stream.write
} }

View File

@@ -1,21 +1,17 @@
//! Client Responses //! Client Responses
use std::io::{self, Read}; use std::io::{self, Read};
use std::marker::PhantomData;
use std::net::Shutdown;
use buffer::BufReader;
use header; use header;
use header::{ContentLength, TransferEncoding}; use net::NetworkStream;
use header::Encoding::Chunked; use http::{self, RawStatus};
use net::{NetworkStream, HttpStream};
use http::{self, HttpReader, RawStatus};
use http::HttpReader::{SizedReader, ChunkedReader, EofReader};
use status; use status;
use version; use version;
use message::{ResponseHead, HttpMessage};
use http11::Http11Message;
/// A response for a client request to a remote server. /// A response for a client request to a remote server.
#[derive(Debug)] #[derive(Debug)]
pub struct Response<S = HttpStream> { pub struct Response {
/// The status from the server. /// The status from the server.
pub status: status::StatusCode, pub status: status::StatusCode,
/// The headers from the server. /// The headers from the server.
@@ -23,9 +19,7 @@ pub struct Response<S = HttpStream> {
/// The HTTP version of this response from the server. /// The HTTP version of this response from the server.
pub version: version::HttpVersion, pub version: version::HttpVersion,
status_raw: RawStatus, status_raw: RawStatus,
body: HttpReader<BufReader<Box<NetworkStream + Send>>>, message: Box<HttpMessage>,
_marker: PhantomData<S>,
} }
impl Response { impl Response {
@@ -33,50 +27,23 @@ impl Response {
/// Creates a new response from a server. /// Creates a new response from a server.
pub fn new(stream: Box<NetworkStream + Send>) -> ::Result<Response> { pub fn new(stream: Box<NetworkStream + Send>) -> ::Result<Response> {
trace!("Response::new"); trace!("Response::new");
let mut stream = BufReader::new(stream); Response::with_message(Box::new(Http11Message::with_stream(stream)))
}
let head = try!(http::parse_response(&mut stream));
let raw_status = head.subject;
let headers = head.headers;
/// Creates a new response received from the server on the given `HttpMessage`.
pub fn with_message(mut message: Box<HttpMessage>) -> ::Result<Response> {
trace!("Response::with_message");
let ResponseHead { headers, raw_status, version } = try!(message.get_incoming());
let status = status::StatusCode::from_u16(raw_status.0); let status = status::StatusCode::from_u16(raw_status.0);
debug!("version={:?}, status={:?}", head.version, status); debug!("version={:?}, status={:?}", version, status);
debug!("headers={:?}", headers); debug!("headers={:?}", headers);
let body = if headers.has::<TransferEncoding>() {
match headers.get::<TransferEncoding>() {
Some(&TransferEncoding(ref codings)) => {
if codings.len() > 1 {
trace!("TODO: #2 handle other codings: {:?}", codings);
};
if codings.contains(&Chunked) {
ChunkedReader(stream, None)
} else {
trace!("not chuncked. read till eof");
EofReader(stream)
}
}
None => unreachable!()
}
} else if headers.has::<ContentLength>() {
match headers.get::<ContentLength>() {
Some(&ContentLength(len)) => SizedReader(stream, len),
None => unreachable!()
}
} else {
trace!("neither Transfer-Encoding nor Content-Length");
EofReader(stream)
};
Ok(Response { Ok(Response {
status: status, status: status,
version: head.version, version: version,
headers: headers, headers: headers,
body: body, message: message,
status_raw: raw_status, status_raw: raw_status,
_marker: PhantomData,
}) })
} }
@@ -84,21 +51,18 @@ impl Response {
pub fn status_raw(&self) -> &RawStatus { pub fn status_raw(&self) -> &RawStatus {
&self.status_raw &self.status_raw
} }
/// Consumes the Request to return the NetworkStream underneath.
pub fn into_inner(self) -> Box<NetworkStream + Send> {
self.body.into_inner().into_inner()
}
} }
impl Read for Response { impl Read for Response {
#[inline] #[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let count = try!(self.body.read(buf)); let count = try!(self.message.read(buf));
if count == 0 { if count == 0 {
if !http::should_keep_alive(self.version, &self.headers) { if !http::should_keep_alive(self.version, &self.headers) {
try!(self.body.get_mut().get_mut().close(Shutdown::Both)); try!(self.message.close_connection()
.map_err(|_| io::Error::new(io::ErrorKind::Other,
"Error closing connection")));
} }
} }
@@ -110,17 +74,15 @@ impl Read for Response {
mod tests { mod tests {
use std::borrow::Cow::Borrowed; use std::borrow::Cow::Borrowed;
use std::io::{self, Read}; use std::io::{self, Read};
use std::marker::PhantomData;
use buffer::BufReader;
use header::Headers; use header::Headers;
use header::TransferEncoding; use header::TransferEncoding;
use header::Encoding; use header::Encoding;
use http::HttpReader::EofReader;
use http::RawStatus; use http::RawStatus;
use mock::MockStream; use mock::MockStream;
use status; use status;
use version; use version;
use http11::Http11Message;
use super::Response; use super::Response;
@@ -137,12 +99,12 @@ mod tests {
status: status::StatusCode::Ok, status: status::StatusCode::Ok,
headers: Headers::new(), headers: Headers::new(),
version: version::HttpVersion::Http11, version: version::HttpVersion::Http11,
body: EofReader(BufReader::new(Box::new(MockStream::new()))), message: Box::new(Http11Message::with_stream(Box::new(MockStream::new()))),
status_raw: RawStatus(200, Borrowed("OK")), status_raw: RawStatus(200, Borrowed("OK")),
_marker: PhantomData,
}; };
let b = res.into_inner().downcast::<MockStream>().ok().unwrap(); let message = res.message.downcast::<Http11Message>().ok().unwrap();
let b = message.into_inner().downcast::<MockStream>().ok().unwrap();
assert_eq!(b, Box::new(MockStream::new())); assert_eq!(b, Box::new(MockStream::new()));
} }