diff --git a/src/http/h1/mod.rs b/src/http/h1/mod.rs index 12101444..9bf47629 100644 --- a/src/http/h1/mod.rs +++ b/src/http/h1/mod.rs @@ -1,118 +1,7 @@ pub use self::decode::Decoder; pub use self::encode::Encoder; -pub use self::parse::parse; - mod decode; mod encode; pub mod parse; -/* -fn should_have_response_body(method: &Method, status: u16) -> bool { - trace!("should_have_response_body({:?}, {})", method, status); - match (method, status) { - (&Method::Head, _) | - (_, 100...199) | - (_, 204) | - (_, 304) | - (&Method::Connect, 200...299) => false, - _ => true - } -} -*/ -/* -const MAX_INVALID_RESPONSE_BYTES: usize = 1024 * 128; -impl HttpMessage for Http11Message { - - fn get_incoming(&mut self) -> ::Result { - unimplemented!(); - /* - try!(self.flush_outgoing()); - let stream = match self.stream.take() { - Some(stream) => stream, - None => { - // The message was already in the reading state... - // TODO Decide what happens in case we try to get a new incoming at that point - return Err(From::from( - io::Error::new(io::ErrorKind::Other, - "Read already in progress"))); - } - }; - - let expected_no_content = stream.previous_response_expected_no_content(); - trace!("previous_response_expected_no_content = {}", expected_no_content); - - let mut stream = BufReader::new(stream); - - let mut invalid_bytes_read = 0; - let head; - loop { - head = match parse_response(&mut stream) { - Ok(head) => head, - Err(::Error::Version) - if expected_no_content && invalid_bytes_read < MAX_INVALID_RESPONSE_BYTES => { - trace!("expected_no_content, found content"); - invalid_bytes_read += 1; - stream.consume(1); - continue; - } - Err(e) => { - self.stream = Some(stream.into_inner()); - return Err(e); - } - }; - break; - } - - let raw_status = head.subject; - let headers = head.headers; - - let method = self.method.take().unwrap_or(Method::Get); - - let is_empty = !should_have_response_body(&method, raw_status.0); - stream.get_mut().set_previous_response_expected_no_content(is_empty); - // According to https://tools.ietf.org/html/rfc7230#section-3.3.3 - // 1. HEAD responses, and Status 1xx, 204, and 304 cannot have a body. - // 2. Status 2xx to a CONNECT cannot have a body. - // 3. Transfer-Encoding: chunked has a chunked body. - // 4. If multiple differing Content-Length headers or invalid, close connection. - // 5. Content-Length header has a sized body. - // 6. Not Client. - // 7. Read till EOF. - self.reader = Some(if is_empty { - SizedReader(stream, 0) - } else { - if let Some(&TransferEncoding(ref codings)) = headers.get() { - if codings.last() == Some(&Chunked) { - ChunkedReader(stream, None) - } else { - trace!("not chunked. read till eof"); - EofReader(stream) - } - } else if let Some(&ContentLength(len)) = headers.get() { - SizedReader(stream, len) - } else if headers.has::() { - trace!("illegal Content-Length: {:?}", headers.get_raw("Content-Length")); - return Err(Error::Header); - } else { - trace!("neither Transfer-Encoding nor Content-Length"); - EofReader(stream) - } - }); - - trace!("Http11Message.reader = {:?}", self.reader); - - - Ok(ResponseHead { - headers: headers, - raw_status: raw_status, - version: head.version, - }) - */ - } -} - - -*/ - - diff --git a/src/http/h1/parse.rs b/src/http/h1/parse.rs index 8a952099..9ad0f7fa 100644 --- a/src/http/h1/parse.rs +++ b/src/http/h1/parse.rs @@ -15,19 +15,15 @@ use version::HttpVersion::{Http10, Http11}; const MAX_HEADERS: usize = 100; const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific -pub fn parse, I>(buf: &mut BytesMut) -> ParseResult { - if buf.len() == 0 { - return Ok(None); - } - trace!("parse({:?})", buf); - ::parse(buf) -} - impl Http1Transaction for ServerTransaction { type Incoming = RequestLine; type Outgoing = StatusCode; fn parse(buf: &mut BytesMut) -> ParseResult { + if buf.len() == 0 { + return Ok(None); + } + trace!("parse({:?})", buf); let mut headers_indices = [HeaderIndices { name: (0, 0), value: (0, 0) @@ -145,6 +141,10 @@ impl Http1Transaction for ClientTransaction { type Outgoing = RequestLine; fn parse(buf: &mut BytesMut) -> ParseResult { + if buf.len() == 0 { + return Ok(None); + } + trace!("parse({:?})", buf); let mut headers_indices = [HeaderIndices { name: (0, 0), value: (0, 0) @@ -332,9 +332,8 @@ fn extend(dst: &mut Vec, data: &[u8]) { #[cfg(test)] mod tests { - use http; + use http::{ServerTransaction, ClientTransaction, Http1Transaction}; use bytes::BytesMut; - use super::{parse}; #[test] fn test_parse_request() { @@ -342,7 +341,7 @@ mod tests { let _ = pretty_env_logger::init(); let mut raw = BytesMut::from(b"GET /echo HTTP/1.1\r\nHost: hyper.rs\r\n\r\n".to_vec()); let expected_len = raw.len(); - let (req, len) = parse::(&mut raw).unwrap().unwrap(); + let (req, len) = ServerTransaction::parse(&mut raw).unwrap().unwrap(); assert_eq!(len, expected_len); assert_eq!(req.subject.0, ::Method::Get); assert_eq!(req.subject.1, "/echo"); @@ -358,7 +357,7 @@ mod tests { let _ = pretty_env_logger::init(); let mut raw = BytesMut::from(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec()); let expected_len = raw.len(); - let (req, len) = parse::(&mut raw).unwrap().unwrap(); + let (req, len) = ClientTransaction::parse(&mut raw).unwrap().unwrap(); assert_eq!(len, expected_len); assert_eq!(req.subject.0, 200); assert_eq!(req.subject.1, "OK"); @@ -370,16 +369,16 @@ mod tests { #[test] fn test_parse_request_errors() { let mut raw = BytesMut::from(b"GET htt:p// HTTP/1.1\r\nHost: hyper.rs\r\n\r\n".to_vec()); - parse::(&mut raw).unwrap_err(); + ServerTransaction::parse(&mut raw).unwrap_err(); } #[test] fn test_parse_raw_status() { let mut raw = BytesMut::from(b"HTTP/1.1 200 OK\r\n\r\n".to_vec()); - let (res, _) = parse::(&mut raw).unwrap().unwrap(); + let (res, _) = ClientTransaction::parse(&mut raw).unwrap().unwrap(); assert_eq!(res.subject.1, "OK"); let mut raw = BytesMut::from(b"HTTP/1.1 200 Howdy\r\n\r\n".to_vec()); - let (res, _) = parse::(&mut raw).unwrap().unwrap(); + let (res, _) = ClientTransaction::parse(&mut raw).unwrap().unwrap(); assert_eq!(res.subject.1, "Howdy"); } @@ -412,7 +411,7 @@ mod tests { b.bytes = len as u64; b.iter(|| { - parse::(&mut raw).unwrap(); + ServerTransaction::parse(&mut raw).unwrap(); restart(&mut raw, len); }); diff --git a/src/http/io.rs b/src/http/io.rs index dd82bc04..ac6e42b0 100644 --- a/src/http/io.rs +++ b/src/http/io.rs @@ -5,7 +5,7 @@ use std::ptr; use tokio_io::{AsyncRead, AsyncWrite}; -use http::{Http1Transaction, h1, MessageHead, ParseResult, DebugTruncate}; +use http::{Http1Transaction, MessageHead, DebugTruncate}; use bytes::{BytesMut, Bytes}; const INIT_BUFFER_SIZE: usize = 8192; @@ -56,7 +56,7 @@ impl Buffered { pub fn parse(&mut self) -> ::Result>> { loop { - match try!(parse::(&mut self.read_buf)) { + match try!(S::parse(&mut self.read_buf)) { Some(head) => { //trace!("parsed {} bytes out of {}", len, self.read_buf.len()); return Ok(Some(head.0)) @@ -68,7 +68,6 @@ impl Buffered { } }, } - self.reserve_read_buf(); match self.read_from_io() { Ok(0) => { trace!("parse eof"); @@ -88,8 +87,17 @@ impl Buffered { fn read_from_io(&mut self) -> io::Result { use bytes::BufMut; + // TODO: Investigate if we still need these unsafe blocks + if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE { + self.read_buf.reserve(INIT_BUFFER_SIZE); + unsafe { // Zero out unused memory + let buf = self.read_buf.bytes_mut(); + let len = buf.len(); + ptr::write_bytes(buf.as_mut_ptr(), 0, len); + } + } self.read_blocked = false; - unsafe { + unsafe { // Can we use AsyncRead::read_buf instead? let n = match self.io.read(self.read_buf.bytes_mut()) { Ok(n) => n, Err(e) => { @@ -104,19 +112,6 @@ impl Buffered { } } - fn reserve_read_buf(&mut self) { - use bytes::BufMut; - if self.read_buf.remaining_mut() >= INIT_BUFFER_SIZE { - return - } - self.read_buf.reserve(INIT_BUFFER_SIZE); - unsafe { - let buf = self.read_buf.bytes_mut(); - let len = buf.len(); - ptr::write_bytes(buf.as_mut_ptr(), 0, len); - } - } - pub fn buffer>(&mut self, buf: B) -> usize { self.write_buf.buffer(buf.as_ref()) } @@ -151,10 +146,6 @@ impl Write for Buffered { } } -fn parse, I>(rdr: &mut BytesMut) -> ParseResult { - h1::parse::(rdr) -} - pub trait MemRead { fn read_mem(&mut self, len: usize) -> io::Result; } @@ -167,7 +158,6 @@ impl MemRead for Buffered { trace!("Buffered.read_mem read_buf is not empty, slicing {}", n); Ok(self.read_buf.split_to(n).freeze()) } else { - self.reserve_read_buf(); let n = try!(self.read_from_io()); Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze()) }