@@ -1,118 +1,7 @@
|
||||
pub use self::decode::Decoder;
|
||||
pub use self::encode::Encoder;
|
||||
|
||||
pub use self::parse::parse;
|
||||
|
||||
mod decode;
|
||||
mod encode;
|
||||
pub mod parse;
|
||||
|
||||
/*
|
||||
fn should_have_response_body(method: &Method, status: u16) -> bool {
|
||||
trace!("should_have_response_body({:?}, {})", method, status);
|
||||
match (method, status) {
|
||||
(&Method::Head, _) |
|
||||
(_, 100...199) |
|
||||
(_, 204) |
|
||||
(_, 304) |
|
||||
(&Method::Connect, 200...299) => false,
|
||||
_ => true
|
||||
}
|
||||
}
|
||||
*/
|
||||
/*
|
||||
const MAX_INVALID_RESPONSE_BYTES: usize = 1024 * 128;
|
||||
impl HttpMessage for Http11Message {
|
||||
|
||||
fn get_incoming(&mut self) -> ::Result<ResponseHead> {
|
||||
unimplemented!();
|
||||
/*
|
||||
try!(self.flush_outgoing());
|
||||
let stream = match self.stream.take() {
|
||||
Some(stream) => stream,
|
||||
None => {
|
||||
// The message was already in the reading state...
|
||||
// TODO Decide what happens in case we try to get a new incoming at that point
|
||||
return Err(From::from(
|
||||
io::Error::new(io::ErrorKind::Other,
|
||||
"Read already in progress")));
|
||||
}
|
||||
};
|
||||
|
||||
let expected_no_content = stream.previous_response_expected_no_content();
|
||||
trace!("previous_response_expected_no_content = {}", expected_no_content);
|
||||
|
||||
let mut stream = BufReader::new(stream);
|
||||
|
||||
let mut invalid_bytes_read = 0;
|
||||
let head;
|
||||
loop {
|
||||
head = match parse_response(&mut stream) {
|
||||
Ok(head) => head,
|
||||
Err(::Error::Version)
|
||||
if expected_no_content && invalid_bytes_read < MAX_INVALID_RESPONSE_BYTES => {
|
||||
trace!("expected_no_content, found content");
|
||||
invalid_bytes_read += 1;
|
||||
stream.consume(1);
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
self.stream = Some(stream.into_inner());
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
break;
|
||||
}
|
||||
|
||||
let raw_status = head.subject;
|
||||
let headers = head.headers;
|
||||
|
||||
let method = self.method.take().unwrap_or(Method::Get);
|
||||
|
||||
let is_empty = !should_have_response_body(&method, raw_status.0);
|
||||
stream.get_mut().set_previous_response_expected_no_content(is_empty);
|
||||
// According to https://tools.ietf.org/html/rfc7230#section-3.3.3
|
||||
// 1. HEAD responses, and Status 1xx, 204, and 304 cannot have a body.
|
||||
// 2. Status 2xx to a CONNECT cannot have a body.
|
||||
// 3. Transfer-Encoding: chunked has a chunked body.
|
||||
// 4. If multiple differing Content-Length headers or invalid, close connection.
|
||||
// 5. Content-Length header has a sized body.
|
||||
// 6. Not Client.
|
||||
// 7. Read till EOF.
|
||||
self.reader = Some(if is_empty {
|
||||
SizedReader(stream, 0)
|
||||
} else {
|
||||
if let Some(&TransferEncoding(ref codings)) = headers.get() {
|
||||
if codings.last() == Some(&Chunked) {
|
||||
ChunkedReader(stream, None)
|
||||
} else {
|
||||
trace!("not chunked. read till eof");
|
||||
EofReader(stream)
|
||||
}
|
||||
} else if let Some(&ContentLength(len)) = headers.get() {
|
||||
SizedReader(stream, len)
|
||||
} else if headers.has::<ContentLength>() {
|
||||
trace!("illegal Content-Length: {:?}", headers.get_raw("Content-Length"));
|
||||
return Err(Error::Header);
|
||||
} else {
|
||||
trace!("neither Transfer-Encoding nor Content-Length");
|
||||
EofReader(stream)
|
||||
}
|
||||
});
|
||||
|
||||
trace!("Http11Message.reader = {:?}", self.reader);
|
||||
|
||||
|
||||
Ok(ResponseHead {
|
||||
headers: headers,
|
||||
raw_status: raw_status,
|
||||
version: head.version,
|
||||
})
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
*/
|
||||
|
||||
|
||||
|
||||
@@ -15,19 +15,15 @@ use version::HttpVersion::{Http10, Http11};
|
||||
const MAX_HEADERS: usize = 100;
|
||||
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
|
||||
|
||||
pub fn parse<T: Http1Transaction<Incoming=I>, I>(buf: &mut BytesMut) -> ParseResult<I> {
|
||||
if buf.len() == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
trace!("parse({:?})", buf);
|
||||
<T as Http1Transaction>::parse(buf)
|
||||
}
|
||||
|
||||
impl Http1Transaction for ServerTransaction {
|
||||
type Incoming = RequestLine;
|
||||
type Outgoing = StatusCode;
|
||||
|
||||
fn parse(buf: &mut BytesMut) -> ParseResult<RequestLine> {
|
||||
if buf.len() == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
trace!("parse({:?})", buf);
|
||||
let mut headers_indices = [HeaderIndices {
|
||||
name: (0, 0),
|
||||
value: (0, 0)
|
||||
@@ -145,6 +141,10 @@ impl Http1Transaction for ClientTransaction {
|
||||
type Outgoing = RequestLine;
|
||||
|
||||
fn parse(buf: &mut BytesMut) -> ParseResult<RawStatus> {
|
||||
if buf.len() == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
trace!("parse({:?})", buf);
|
||||
let mut headers_indices = [HeaderIndices {
|
||||
name: (0, 0),
|
||||
value: (0, 0)
|
||||
@@ -332,9 +332,8 @@ fn extend(dst: &mut Vec<u8>, data: &[u8]) {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use http;
|
||||
use http::{ServerTransaction, ClientTransaction, Http1Transaction};
|
||||
use bytes::BytesMut;
|
||||
use super::{parse};
|
||||
|
||||
#[test]
|
||||
fn test_parse_request() {
|
||||
@@ -342,7 +341,7 @@ mod tests {
|
||||
let _ = pretty_env_logger::init();
|
||||
let mut raw = BytesMut::from(b"GET /echo HTTP/1.1\r\nHost: hyper.rs\r\n\r\n".to_vec());
|
||||
let expected_len = raw.len();
|
||||
let (req, len) = parse::<http::ServerTransaction, _>(&mut raw).unwrap().unwrap();
|
||||
let (req, len) = ServerTransaction::parse(&mut raw).unwrap().unwrap();
|
||||
assert_eq!(len, expected_len);
|
||||
assert_eq!(req.subject.0, ::Method::Get);
|
||||
assert_eq!(req.subject.1, "/echo");
|
||||
@@ -358,7 +357,7 @@ mod tests {
|
||||
let _ = pretty_env_logger::init();
|
||||
let mut raw = BytesMut::from(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec());
|
||||
let expected_len = raw.len();
|
||||
let (req, len) = parse::<http::ClientTransaction, _>(&mut raw).unwrap().unwrap();
|
||||
let (req, len) = ClientTransaction::parse(&mut raw).unwrap().unwrap();
|
||||
assert_eq!(len, expected_len);
|
||||
assert_eq!(req.subject.0, 200);
|
||||
assert_eq!(req.subject.1, "OK");
|
||||
@@ -370,16 +369,16 @@ mod tests {
|
||||
#[test]
|
||||
fn test_parse_request_errors() {
|
||||
let mut raw = BytesMut::from(b"GET htt:p// HTTP/1.1\r\nHost: hyper.rs\r\n\r\n".to_vec());
|
||||
parse::<http::ServerTransaction, _>(&mut raw).unwrap_err();
|
||||
ServerTransaction::parse(&mut raw).unwrap_err();
|
||||
}
|
||||
#[test]
|
||||
fn test_parse_raw_status() {
|
||||
let mut raw = BytesMut::from(b"HTTP/1.1 200 OK\r\n\r\n".to_vec());
|
||||
let (res, _) = parse::<http::ClientTransaction, _>(&mut raw).unwrap().unwrap();
|
||||
let (res, _) = ClientTransaction::parse(&mut raw).unwrap().unwrap();
|
||||
assert_eq!(res.subject.1, "OK");
|
||||
|
||||
let mut raw = BytesMut::from(b"HTTP/1.1 200 Howdy\r\n\r\n".to_vec());
|
||||
let (res, _) = parse::<http::ClientTransaction, _>(&mut raw).unwrap().unwrap();
|
||||
let (res, _) = ClientTransaction::parse(&mut raw).unwrap().unwrap();
|
||||
assert_eq!(res.subject.1, "Howdy");
|
||||
}
|
||||
|
||||
@@ -412,7 +411,7 @@ mod tests {
|
||||
|
||||
b.bytes = len as u64;
|
||||
b.iter(|| {
|
||||
parse::<http::ServerTransaction, _>(&mut raw).unwrap();
|
||||
ServerTransaction::parse(&mut raw).unwrap();
|
||||
restart(&mut raw, len);
|
||||
});
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::ptr;
|
||||
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use http::{Http1Transaction, h1, MessageHead, ParseResult, DebugTruncate};
|
||||
use http::{Http1Transaction, MessageHead, DebugTruncate};
|
||||
use bytes::{BytesMut, Bytes};
|
||||
|
||||
const INIT_BUFFER_SIZE: usize = 8192;
|
||||
@@ -56,7 +56,7 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
|
||||
|
||||
pub fn parse<S: Http1Transaction>(&mut self) -> ::Result<Option<MessageHead<S::Incoming>>> {
|
||||
loop {
|
||||
match try!(parse::<S, _>(&mut self.read_buf)) {
|
||||
match try!(S::parse(&mut self.read_buf)) {
|
||||
Some(head) => {
|
||||
//trace!("parsed {} bytes out of {}", len, self.read_buf.len());
|
||||
return Ok(Some(head.0))
|
||||
@@ -68,7 +68,6 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
|
||||
}
|
||||
},
|
||||
}
|
||||
self.reserve_read_buf();
|
||||
match self.read_from_io() {
|
||||
Ok(0) => {
|
||||
trace!("parse eof");
|
||||
@@ -88,8 +87,17 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
|
||||
|
||||
fn read_from_io(&mut self) -> io::Result<usize> {
|
||||
use bytes::BufMut;
|
||||
// TODO: Investigate if we still need these unsafe blocks
|
||||
if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE {
|
||||
self.read_buf.reserve(INIT_BUFFER_SIZE);
|
||||
unsafe { // Zero out unused memory
|
||||
let buf = self.read_buf.bytes_mut();
|
||||
let len = buf.len();
|
||||
ptr::write_bytes(buf.as_mut_ptr(), 0, len);
|
||||
}
|
||||
}
|
||||
self.read_blocked = false;
|
||||
unsafe {
|
||||
unsafe { // Can we use AsyncRead::read_buf instead?
|
||||
let n = match self.io.read(self.read_buf.bytes_mut()) {
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
@@ -104,19 +112,6 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn reserve_read_buf(&mut self) {
|
||||
use bytes::BufMut;
|
||||
if self.read_buf.remaining_mut() >= INIT_BUFFER_SIZE {
|
||||
return
|
||||
}
|
||||
self.read_buf.reserve(INIT_BUFFER_SIZE);
|
||||
unsafe {
|
||||
let buf = self.read_buf.bytes_mut();
|
||||
let len = buf.len();
|
||||
ptr::write_bytes(buf.as_mut_ptr(), 0, len);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn buffer<B: AsRef<[u8]>>(&mut self, buf: B) -> usize {
|
||||
self.write_buf.buffer(buf.as_ref())
|
||||
}
|
||||
@@ -151,10 +146,6 @@ impl<T: Write> Write for Buffered<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse<T: Http1Transaction<Incoming=I>, I>(rdr: &mut BytesMut) -> ParseResult<I> {
|
||||
h1::parse::<T, I>(rdr)
|
||||
}
|
||||
|
||||
pub trait MemRead {
|
||||
fn read_mem(&mut self, len: usize) -> io::Result<Bytes>;
|
||||
}
|
||||
@@ -167,7 +158,6 @@ impl<T: AsyncRead + AsyncWrite> MemRead for Buffered<T> {
|
||||
trace!("Buffered.read_mem read_buf is not empty, slicing {}", n);
|
||||
Ok(self.read_buf.split_to(n).freeze())
|
||||
} else {
|
||||
self.reserve_read_buf();
|
||||
let n = try!(self.read_from_io());
|
||||
Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user