diff --git a/Cargo.toml b/Cargo.toml index e8c1bd7d..aacf0465 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,8 @@ include = [ ] [dependencies] -base64 = "0.4.0" +base64 = "0.4" +bytes = "0.4" futures = "0.1.7" futures-cpupool = "0.1" httparse = "1.0" diff --git a/src/header/common/link.rs b/src/header/common/link.rs index 9f718b80..d6cafa60 100644 --- a/src/header/common/link.rs +++ b/src/header/common/link.rs @@ -933,7 +933,7 @@ mod tests { use header::Header; use http::{ServerTransaction, Http1Transaction}; - use http::buf::MemBuf; + use bytes::BytesMut; use mime::Mime; use mime::TopLevel::Text; @@ -1018,7 +1018,7 @@ mod tests { let expected_link = Link::new(vec![first_link, second_link, third_link]); - let raw = MemBuf::from(b"GET /super_short_uri/and_whatever HTTP/1.1\r\nHost: \ + let mut raw = BytesMut::from(b"GET /super_short_uri/and_whatever HTTP/1.1\r\nHost: \ hyper.rs\r\nAccept: a lot of things\r\nAccept-Charset: \ utf8\r\nAccept-Encoding: *\r\nLink: ; \ rel=\"previous\"; title*=UTF-8'de'letztes%20Kapitel, \ @@ -1029,7 +1029,7 @@ mod tests { rel=\"previous\"; rev=next; title=\"previous chapter\"\ \r\n\r\n".to_vec()); - let (mut res, _) = ServerTransaction::parse(&raw).unwrap().unwrap(); + let (mut res, _) = ServerTransaction::parse(&mut raw).unwrap().unwrap(); let link = res.headers.remove::().unwrap(); diff --git a/src/header/mod.rs b/src/header/mod.rs index ab5027b4..f538d750 100644 --- a/src/header/mod.rs +++ b/src/header/mod.rs @@ -87,7 +87,7 @@ use self::internals::{Item, VecMap, Entry}; pub use self::shared::*; pub use self::common::*; pub use self::raw::Raw; -use http::buf::MemSlice; +use bytes::Bytes; mod common; mod internals; @@ -611,8 +611,8 @@ impl<'a> Extend> for Headers { } } -impl<'a> Extend<(&'a str, MemSlice)> for Headers { - fn extend>(&mut self, iter: I) { +impl<'a> Extend<(&'a str, Bytes)> for Headers { + fn extend>(&mut self, iter: I) { for (name, value) in iter { let name = HeaderName(UniCase(maybe_literal(name))); //let trim = header.value.iter().rev().take_while(|&&x| x == b' ').count(); diff --git a/src/header/raw.rs b/src/header/raw.rs index c122b3fb..2b03701c 100644 --- a/src/header/raw.rs +++ b/src/header/raw.rs @@ -1,6 +1,6 @@ use std::borrow::Cow; use std::fmt; -use http::buf::MemSlice; +use bytes::Bytes; /// A raw header value. #[derive(Clone, PartialEq, Eq)] @@ -72,7 +72,7 @@ enum Lines { enum Line { Static(&'static [u8]), Owned(Vec), - Shared(MemSlice), + Shared(Bytes), } fn eq, B: AsRef<[u8]>>(a: &[A], b: &[B]) -> bool { @@ -152,9 +152,9 @@ impl<'a> From<&'a [u8]> for Raw { } } -impl From for Raw { +impl From for Raw { #[inline] - fn from(val: MemSlice) -> Raw { + fn from(val: Bytes) -> Raw { Raw(Lines::One(Line::Shared(val))) } } @@ -166,9 +166,9 @@ impl From> for Line { } } -impl From for Line { +impl From for Line { #[inline] - fn from(val: MemSlice) -> Line { + fn from(val: Bytes) -> Line { Line::Shared(val) } } @@ -183,11 +183,11 @@ impl AsRef<[u8]> for Line { } } -pub fn parsed(val: MemSlice) -> Raw { +pub fn parsed(val: Bytes) -> Raw { Raw(Lines::One(From::from(val))) } -pub fn push(raw: &mut Raw, val: MemSlice) { +pub fn push(raw: &mut Raw, val: Bytes) { raw.push_line(Line::from(val)); } diff --git a/src/http/body.rs b/src/http/body.rs index 7797b5f1..f98d5931 100644 --- a/src/http/body.rs +++ b/src/http/body.rs @@ -1,9 +1,9 @@ -use std::convert::From; - -use tokio_proto; -use http::Chunk; +use bytes::Bytes; use futures::{Poll, Stream}; use futures::sync::mpsc; +use tokio_proto; + +use http::Chunk; pub type TokioBody = tokio_proto::streaming::Body; @@ -58,6 +58,12 @@ impl From for Body { } } +impl From for Body { + fn from (bytes: Bytes) -> Body { + Body(TokioBody::from(Chunk::from(bytes))) + } +} + impl From> for Body { fn from (vec: Vec) -> Body { Body(TokioBody::from(Chunk::from(vec))) diff --git a/src/http/buf.rs b/src/http/buf.rs deleted file mode 100644 index 5f3daba7..00000000 --- a/src/http/buf.rs +++ /dev/null @@ -1,432 +0,0 @@ -use std::borrow::Cow; -use std::cell::{Cell, UnsafeCell}; -use std::fmt; -use std::io::{self, Read}; -use std::ops::{Index, Range, RangeFrom, RangeTo, RangeFull}; -use std::ptr; -use std::str; -use std::sync::Arc; - -pub struct MemBuf { - buf: Arc>>, - start: Cell, - end: usize, -} - -impl MemBuf { - pub fn new() -> MemBuf { - MemBuf::with_capacity(0) - } - - pub fn with_capacity(cap: usize) -> MemBuf { - MemBuf { - buf: Arc::new(UnsafeCell::new(vec![0; cap])), - start: Cell::new(0), - end: 0, - } - } - - pub fn bytes(&self) -> &[u8] { - &self.buf()[self.start.get()..self.end] - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn len(&self) -> usize { - self.end - self.start.get() - } - - pub fn capacity(&self) -> usize { - self.buf().len() - } - - pub fn read_from(&mut self, io: &mut R) -> io::Result { - let start = self.end - self.start.get(); - let n = try!(io.read(&mut self.buf_mut()[start..])); - self.end += n; - Ok(n) - } - - pub fn slice(&self, len: usize) -> MemSlice { - let start = self.start.get(); - assert!(!(self.end - start < len)); - let end = start + len; - self.start.set(end); - MemSlice { - buf: self.buf.clone(), - start: start, - end: end, - } - } - - pub fn reserve(&mut self, needed: usize) { - let orig_cap = self.capacity(); - let remaining = orig_cap - self.end; - if remaining >= needed { - // all done - return - } - let is_unique = Arc::get_mut(&mut self.buf).is_some(); - trace!("MemBuf::reserve {} access", if is_unique { "unique" } else { "shared" }); - if is_unique && remaining + self.start.get() >= needed { - // we have unique access, we can mutate this vector - trace!("MemBuf::reserve unique access, shifting"); - unsafe { - let mut buf = &mut *self.buf.get(); - let len = self.len(); - ptr::copy( - buf.as_ptr().offset(self.start.get() as isize), - buf.as_mut_ptr(), - len - ); - self.start.set(0); - self.end = len; - } - } else if is_unique { - // we have unique access, we can mutate this vector - trace!("MemBuf::reserve unique access, growing"); - unsafe { - let mut vec = &mut *self.buf.get(); - grow_zerofill(vec, needed); - } - } else { - // we need to allocate more space, but don't have unique - // access, so we need to make a new buffer - trace!("MemBuf::reserve shared buffer, creating new"); - let mut new = MemBuf::with_capacity(needed); - unsafe { - ptr::copy_nonoverlapping( - self.bytes().as_ptr(), - new.buf_mut().as_mut_ptr(), - self.len() - ); - } - new.end = self.len(); - *self = new; - } - } - - pub fn reset(&mut self) { - match Arc::get_mut(&mut self.buf) { - Some(_) => { - trace!("MemBuf::reset was unique, re-using"); - self.start.set(0); - self.end = 0; - }, - None => { - trace!("MemBuf::reset not unique, creating new MemBuf"); - *self = MemBuf::with_capacity(self.buf().len()); - } - } - } - - #[cfg(all(feature = "nightly", test))] - pub fn restart(&mut self) { - Arc::get_mut(&mut self.buf).unwrap(); - self.start.set(0); - } - - fn buf_mut(&mut self) -> &mut [u8] { - // The contract here is that we NEVER have a MemSlice that exists - // with slice.end > self.start. - // In other words, we should *ALWAYS* be the only instance that can - // look at the bytes on the right side of self.start. - unsafe { - &mut (*self.buf.get())[self.start.get()..] - } - } - - fn buf(&self) -> &Vec { - unsafe { - &*self.buf.get() - } - } -} - -#[inline] -unsafe fn grow_zerofill(buf: &mut Vec, additional: usize) { - let orig_cap = buf.capacity(); - buf.reserve(additional); - let new_cap = buf.capacity(); - let reserved = new_cap - orig_cap; - let orig_len = buf.len(); - zero(buf, orig_len, reserved); - buf.set_len(orig_len + reserved); - - - unsafe fn zero(buf: &mut Vec, offset: usize, len: usize) { - assert!(buf.capacity() >= len + offset, - "offset of {} with len of {} is bigger than capacity of {}", - offset, len, buf.capacity()); - ptr::write_bytes(buf.as_mut_ptr().offset(offset as isize), 0, len); - } -} - -#[test] -fn test_grow_zerofill() { - for init in 0..100 { - for reserve in (0..100).rev() { - let mut vec = vec![0; init]; - unsafe { grow_zerofill(&mut vec, reserve) } - assert_eq!(vec.len(), vec.capacity()); - } - } -} - -impl fmt::Debug for MemBuf { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("MemBuf") - .field("start", &self.start.get()) - .field("end", &self.end) - .field("buf", &&self.buf()[self.start.get()..self.end]) - .finish() - } -} - -impl From> for MemBuf { - fn from(mut vec: Vec) -> MemBuf { - let end = vec.iter().find(|&&x| x == 0).map(|&x| x as usize).unwrap_or(vec.len()); - vec.shrink_to_fit(); - MemBuf { - buf: Arc::new(UnsafeCell::new(vec)), - start: Cell::new(0), - end: end, - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct MemStr(MemSlice); - -impl MemStr { - pub unsafe fn from_utf8_unchecked(slice: MemSlice) -> MemStr { - MemStr(slice) - } - - pub fn as_str(&self) -> &str { - unsafe { str::from_utf8_unchecked(self.0.as_ref()) } - } -} - -pub struct MemSlice { - buf: Arc>>, - start: usize, - end: usize, -} - -impl MemSlice { - pub fn empty() -> MemSlice { - MemSlice { - buf: Arc::new(UnsafeCell::new(Vec::new())), - start: 0, - end: 0, - } - } - - pub fn len(&self) -> usize { - self.get().len() - } - - pub fn is_empty(&self) -> bool { - self.get().is_empty() - } - - pub fn slice(&self, range: S) -> MemSlice { - range.slice(self) - } - - fn get(&self) -> &[u8] { - unsafe { &(*self.buf.get())[self.start..self.end] } - } -} - -impl AsRef<[u8]> for MemSlice { - fn as_ref(&self) -> &[u8] { - self.get() - } -} - -impl fmt::Debug for MemSlice { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(&self.get(), f) - } -} - -impl Index for MemSlice { - type Output = u8; - fn index(&self, i: usize) -> &u8 { - &self.get()[i] - } -} - -impl<'a> From<&'a [u8]> for MemSlice { - fn from(v: &'a [u8]) -> MemSlice { - MemSlice { - buf: Arc::new(UnsafeCell::new(v.to_vec())), - start: 0, - end: v.len(), - } - } -} - -impl From> for MemSlice { - fn from(v: Vec) -> MemSlice { - let len = v.len(); - MemSlice { - buf: Arc::new(UnsafeCell::new(v)), - start: 0, - end: len, - } - } -} - -impl<'a> From<&'a str> for MemSlice { - fn from(v: &'a str) -> MemSlice { - let v = v.as_bytes(); - MemSlice { - buf: Arc::new(UnsafeCell::new(v.to_vec())), - start: 0, - end: v.len(), - } - } -} - -impl<'a> From> for MemSlice { - fn from(v: Cow<'a, [u8]>) -> MemSlice { - let v = v.into_owned(); - let len = v.len(); - MemSlice { - buf: Arc::new(UnsafeCell::new(v)), - start: 0, - end: len, - } - } -} - -impl PartialEq for MemSlice { - fn eq(&self, other: &MemSlice) -> bool { - self.get() == other.get() - } -} - -impl PartialEq<[u8]> for MemSlice { - fn eq(&self, other: &[u8]) -> bool { - self.get() == other - } -} - -impl PartialEq for MemSlice { - fn eq(&self, other: &str) -> bool { - self.get() == other.as_bytes() - } -} - -impl PartialEq> for MemSlice { - fn eq(&self, other: &Vec) -> bool { - self.get() == other.as_slice() - } -} - -impl Eq for MemSlice {} - -impl Clone for MemSlice { - fn clone(&self) -> MemSlice { - MemSlice { - buf: self.buf.clone(), - start: self.start, - end: self.end, - } - } -} - -pub trait Slice { - fn slice(self, subject: &MemSlice) -> MemSlice; -} - - -impl Slice for Range { - fn slice(self, subject: &MemSlice) -> MemSlice { - assert!(subject.start + self.start <= subject.end); - assert!(subject.start + self.end <= subject.end); - MemSlice { - buf: subject.buf.clone(), - start: subject.start + self.start, - end: subject.start + self.end, - } - } -} - -impl Slice for RangeFrom { - fn slice(self, subject: &MemSlice) -> MemSlice { - assert!(subject.start + self.start <= subject.end); - MemSlice { - buf: subject.buf.clone(), - start: subject.start + self.start, - end: subject.end, - } - } -} - -impl Slice for RangeTo { - fn slice(self, subject: &MemSlice) -> MemSlice { - assert!(subject.start + self.end <= subject.end); - MemSlice { - buf: subject.buf.clone(), - start: subject.start, - end: subject.start + self.end, - } - } -} - -impl Slice for RangeFull { - fn slice(self, subject: &MemSlice) -> MemSlice { - MemSlice { - buf: subject.buf.clone(), - start: subject.start, - end: subject.end, - } - } -} - -unsafe impl Send for MemBuf {} -unsafe impl Send for MemSlice {} -unsafe impl Sync for MemSlice {} - -#[cfg(test)] -impl ::http::io::MemRead for ::mock::AsyncIo { - fn read_mem(&mut self, len: usize) -> io::Result { - let mut v = vec![0; len]; - let n = try!(self.read(v.as_mut_slice())); - v.truncate(n); - Ok(MemSlice { - buf: Arc::new(UnsafeCell::new(v)), - start: 0, - end: n, - }) - } -} - -#[cfg(test)] -mod tests { - use super::{MemBuf}; - - #[test] - fn test_mem_slice_slice() { - let buf = MemBuf::from(b"Hello World".to_vec()); - - let len = buf.len(); - let full = buf.slice(len); - - assert_eq!(full.as_ref(), b"Hello World"); - assert_eq!(full.slice(6..).as_ref(), b"World"); - assert_eq!(full.slice(..5).as_ref(), b"Hello"); - assert_eq!(full.slice(..).as_ref(), b"Hello World"); - for a in 0..len { - for b in a..len { - assert_eq!(full.slice(a..b).as_ref(), &b"Hello World"[a..b], "{}..{}", a, b); - } - } - } -} diff --git a/src/http/chunk.rs b/src/http/chunk.rs index 5f3ec626..5e2216f3 100644 --- a/src/http/chunk.rs +++ b/src/http/chunk.rs @@ -1,27 +1,25 @@ use std::fmt; -use http::buf::MemSlice; +use bytes::Bytes; /// A piece of a message body. pub struct Chunk(Inner); enum Inner { - Owned(Vec), - Mem(MemSlice), - Static(&'static [u8]), + Shared(Bytes), } impl From> for Chunk { #[inline] fn from(v: Vec) -> Chunk { - Chunk(Inner::Owned(v)) + Chunk::from(Bytes::from(v)) } } impl From<&'static [u8]> for Chunk { #[inline] fn from(slice: &'static [u8]) -> Chunk { - Chunk(Inner::Static(slice)) + Chunk::from(Bytes::from_static(slice)) } } @@ -39,9 +37,9 @@ impl From<&'static str> for Chunk { } } -impl From for Chunk { - fn from(mem: MemSlice) -> Chunk { - Chunk(Inner::Mem(mem)) +impl From for Chunk { + fn from(mem: Bytes) -> Chunk { + Chunk(Inner::Shared(mem)) } } @@ -58,9 +56,7 @@ impl AsRef<[u8]> for Chunk { #[inline] fn as_ref(&self) -> &[u8] { match self.0 { - Inner::Owned(ref vec) => vec, - Inner::Mem(ref slice) => slice.as_ref(), - Inner::Static(slice) => slice, + Inner::Shared(ref slice) => slice, } } } diff --git a/src/http/h1/decode.rs b/src/http/h1/decode.rs index 1c226cce..89a41b0e 100644 --- a/src/http/h1/decode.rs +++ b/src/http/h1/decode.rs @@ -1,7 +1,7 @@ use std::usize; use std::io; -use http::buf::MemSlice; +use bytes::Bytes; use http::io::MemRead; use self::Kind::{Length, Chunked, Eof}; @@ -79,12 +79,12 @@ impl Decoder { } impl Decoder { - pub fn decode(&mut self, body: &mut R) -> io::Result { + pub fn decode(&mut self, body: &mut R) -> io::Result { match self.kind { Length(ref mut remaining) => { trace!("Sized read, remaining={:?}", remaining); if *remaining == 0 { - Ok(MemSlice::empty()) + Ok(Bytes::new()) } else { let to_read = *remaining as usize; let buf = try!(body.read_mem(to_read)); @@ -107,7 +107,7 @@ impl Decoder { *state = try!(state.step(body, size, &mut buf)); if *state == ChunkedState::End { trace!("end of chunked"); - return Ok(MemSlice::empty()); + return Ok(Bytes::new()); } if let Some(buf) = buf { return Ok(buf); @@ -116,7 +116,7 @@ impl Decoder { } Eof(ref mut is_eof) => { if *is_eof { - Ok(MemSlice::empty()) + Ok(Bytes::new()) } else { // 8192 chosen because its about 2 packets, there probably // won't be that much available, so don't have MemReaders @@ -150,7 +150,7 @@ impl ChunkedState { fn step(&self, body: &mut R, size: &mut u64, - buf: &mut Option) + buf: &mut Option) -> io::Result { use self::ChunkedState::*; Ok(match *self { @@ -223,7 +223,7 @@ impl ChunkedState { fn read_body(rdr: &mut R, rem: &mut u64, - buf: &mut Option) + buf: &mut Option) -> io::Result { trace!("Chunked read, remaining={:?}", rem); @@ -285,18 +285,19 @@ mod tests { use super::Decoder; use super::ChunkedState; use http::io::MemRead; - use http::buf::{MemBuf, MemSlice}; + use bytes::{BytesMut, Bytes}; use mock::AsyncIo; impl<'a> MemRead for &'a [u8] { - fn read_mem(&mut self, len: usize) -> io::Result { + fn read_mem(&mut self, len: usize) -> io::Result { let n = ::std::cmp::min(len, self.len()); if n > 0 { - let mut buf = MemBuf::with_capacity(n); - buf.read_from(self).unwrap(); - Ok(buf.slice(n)) + let (a, b) = self.split_at(n); + let mut buf = BytesMut::from(a); + *self = b; + Ok(buf.drain_to(n).freeze()) } else { - Ok(MemSlice::empty()) + Ok(Bytes::new()) } } } diff --git a/src/http/h1/parse.rs b/src/http/h1/parse.rs index 78046034..947813ab 100644 --- a/src/http/h1/parse.rs +++ b/src/http/h1/parse.rs @@ -2,11 +2,11 @@ use std::borrow::Cow; use std::fmt::{self, Write}; use httparse; +use bytes::{BytesMut, Bytes}; use header::{self, Headers, ContentLength, TransferEncoding}; -use http::{MessageHead, RawStatus, Http1Transaction, ParseResult, ServerTransaction, ClientTransaction, RequestLine}; +use http::{ByteStr, MessageHead, RawStatus, Http1Transaction, ParseResult, ServerTransaction, ClientTransaction, RequestLine}; use http::h1::{Encoder, Decoder}; -use http::buf::{MemBuf, MemSlice, MemStr}; use method::Method; use status::StatusCode; use version::HttpVersion::{Http10, Http11}; @@ -14,7 +14,7 @@ use version::HttpVersion::{Http10, Http11}; const MAX_HEADERS: usize = 100; const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific -pub fn parse, I>(buf: &MemBuf) -> ParseResult { +pub fn parse, I>(buf: &mut BytesMut) -> ParseResult { if buf.len() == 0 { return Ok(None); } @@ -26,38 +26,54 @@ impl Http1Transaction for ServerTransaction { type Incoming = RequestLine; type Outgoing = StatusCode; - fn parse(buf: &MemBuf) -> ParseResult { - let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS]; - trace!("Request.parse([Header; {}], [u8; {}])", headers.len(), buf.len()); - let mut req = httparse::Request::new(&mut headers); - Ok(match try!(req.parse(buf.bytes())) { - httparse::Status::Complete(len) => { - trace!("Request.parse Complete({})", len); - let slice = buf.slice(len); - let path = req.path.unwrap(); - let path_start = path.as_ptr() as usize - slice.as_ref().as_ptr() as usize; - let path_end = path_start + path.len(); - let path = slice.slice(path_start..path_end); - // path was found to be utf8 by httparse - let path = unsafe { MemStr::from_utf8_unchecked(path) }; - let subject = RequestLine( - try!(req.method.unwrap().parse()), - try!(::uri::from_mem_str(path)), - ); - let mut headers = Headers::with_capacity(req.headers.len()); - headers.extend(HeadersAsMemSliceIter { - headers: req.headers.iter(), - slice: slice, - }); + fn parse(buf: &mut BytesMut) -> ParseResult { + let mut headers_indices = [HeaderIndices { + name: (0, 0), + value: (0, 0) + }; MAX_HEADERS]; + let (len, method, path, version, headers_len) = { + let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS]; + trace!("Request.parse([Header; {}], [u8; {}])", headers.len(), buf.len()); + let mut req = httparse::Request::new(&mut headers); + match try!(req.parse(&buf)) { + httparse::Status::Complete(len) => { + trace!("httparse Complete({})", len); + let method = try!(req.method.unwrap().parse()); + let path = req.path.unwrap(); + let bytes_ptr = buf.as_ref().as_ptr() as usize; + let path_start = path.as_ptr() as usize - bytes_ptr; + let path_end = path_start + path.len(); + let path = (path_start, path_end); + let version = if req.version.unwrap() == 1 { Http11 } else { Http10 }; - Some((MessageHead { - version: if req.version.unwrap() == 1 { Http11 } else { Http10 }, - subject: subject, - headers: headers, - }, len)) + record_header_indices(buf.as_ref(), &req.headers, &mut headers_indices); + let headers_len = req.headers.len(); + (len, method, path, version, headers_len) + } + httparse::Status::Partial => return Ok(None), } - httparse::Status::Partial => None, - }) + }; + + let mut headers = Headers::with_capacity(headers_len); + let slice = buf.drain_to(len).freeze(); + let path = slice.slice(path.0, path.1); + // path was found to be utf8 by httparse + let path = unsafe { ByteStr::from_utf8_unchecked(path) }; + let subject = RequestLine( + method, + try!(::uri::from_mem_str(path)), + ); + + headers.extend(HeadersAsBytesIter { + headers: headers_indices[..headers_len].iter(), + slice: slice, + }); + + Ok(Some((MessageHead { + version: version, + subject: subject, + headers: headers, + }, len))) } fn decoder(head: &MessageHead) -> ::Result { @@ -127,32 +143,44 @@ impl Http1Transaction for ClientTransaction { type Incoming = RawStatus; type Outgoing = RequestLine; - fn parse(buf: &MemBuf) -> ParseResult { - let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS]; - trace!("Response.parse([Header; {}], [u8; {}])", headers.len(), buf.len()); - let mut res = httparse::Response::new(&mut headers); - Ok(match try!(res.parse(buf.bytes())) { - httparse::Status::Complete(len) => { - trace!("Response.try_parse Complete({})", len); - let code = res.code.unwrap(); - let reason = match StatusCode::from_u16(code).canonical_reason() { - Some(reason) if reason == res.reason.unwrap() => Cow::Borrowed(reason), - _ => Cow::Owned(res.reason.unwrap().to_owned()) - }; - let mut headers = Headers::with_capacity(res.headers.len()); - let slice = buf.slice(len); - headers.extend(HeadersAsMemSliceIter { - headers: res.headers.iter(), - slice: slice, - }); - Some((MessageHead { - version: if res.version.unwrap() == 1 { Http11 } else { Http10 }, - subject: RawStatus(code, reason), - headers: headers, - }, len)) - }, - httparse::Status::Partial => None, - }) + fn parse(buf: &mut BytesMut) -> ParseResult { + let mut headers_indices = [HeaderIndices { + name: (0, 0), + value: (0, 0) + }; MAX_HEADERS]; + let (len, code, reason, version, headers_len) = { + let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS]; + trace!("Response.parse([Header; {}], [u8; {}])", headers.len(), buf.len()); + let mut res = httparse::Response::new(&mut headers); + let bytes = buf.as_ref(); + match try!(res.parse(bytes)) { + httparse::Status::Complete(len) => { + trace!("Response.try_parse Complete({})", len); + let code = res.code.unwrap(); + let reason = match StatusCode::from_u16(code).canonical_reason() { + Some(reason) if reason == res.reason.unwrap() => Cow::Borrowed(reason), + _ => Cow::Owned(res.reason.unwrap().to_owned()) + }; + let version = if res.version.unwrap() == 1 { Http11 } else { Http10 }; + record_header_indices(bytes, &res.headers, &mut headers_indices); + let headers_len = res.headers.len(); + (len, code, reason, version, headers_len) + }, + httparse::Status::Partial => return Ok(None), + } + }; + + let mut headers = Headers::with_capacity(headers_len); + let slice = buf.drain_to(len).freeze(); + headers.extend(HeadersAsBytesIter { + headers: headers_indices[..headers_len].iter(), + slice: slice, + }); + Ok(Some((MessageHead { + version: version, + subject: RawStatus(code, reason), + headers: headers, + }, len))) } fn decoder(inc: &MessageHead) -> ::Result { @@ -237,18 +265,41 @@ impl Http1Transaction for ClientTransaction { } } -struct HeadersAsMemSliceIter<'a> { - headers: ::std::slice::Iter<'a, httparse::Header<'a>>, - slice: MemSlice, +#[derive(Clone, Copy)] +struct HeaderIndices { + name: (usize, usize), + value: (usize, usize), } -impl<'a> Iterator for HeadersAsMemSliceIter<'a> { - type Item = (&'a str, MemSlice); +fn record_header_indices(bytes: &[u8], headers: &[httparse::Header], indices: &mut [HeaderIndices]) { + let bytes_ptr = bytes.as_ptr() as usize; + for (header, indices) in headers.iter().zip(indices.iter_mut()) { + let name_start = header.name.as_ptr() as usize - bytes_ptr; + let name_end = name_start + header.name.len(); + indices.name = (name_start, name_end); + let value_start = header.value.as_ptr() as usize - bytes_ptr; + let value_end = value_start + header.value.len(); + indices.value = (value_start, value_end); + } +} + +struct HeadersAsBytesIter<'a> { + headers: ::std::slice::Iter<'a, HeaderIndices>, + slice: Bytes, +} + +impl<'a> Iterator for HeadersAsBytesIter<'a> { + type Item = (&'a str, Bytes); fn next(&mut self) -> Option { self.headers.next().map(|header| { - let value_start = header.value.as_ptr() as usize - self.slice.as_ref().as_ptr() as usize; - let value_end = value_start + header.value.len(); - (header.name, self.slice.slice(value_start..value_end)) + let name = unsafe { + let bytes = ::std::slice::from_raw_parts( + self.slice.as_ref().as_ptr().offset(header.name.0 as isize), + header.name.1 - header.name.0 + ); + ::std::str::from_utf8_unchecked(bytes) + }; + (name, self.slice.slice(header.value.0, header.value.1)) }) } } @@ -281,28 +332,53 @@ fn extend(dst: &mut Vec, data: &[u8]) { #[cfg(test)] mod tests { use http; - use http::buf::MemBuf; + use bytes::BytesMut; use super::{parse}; #[test] fn test_parse_request() { - let raw = MemBuf::from(b"GET /echo HTTP/1.1\r\nHost: hyper.rs\r\n\r\n".to_vec()); - parse::(&raw).unwrap(); + extern crate pretty_env_logger; + let _ = pretty_env_logger::init(); + let mut raw = BytesMut::from(b"GET /echo HTTP/1.1\r\nHost: hyper.rs\r\n\r\n".to_vec()); + let expected_len = raw.len(); + let (req, len) = parse::(&mut raw).unwrap().unwrap(); + assert_eq!(len, expected_len); + assert_eq!(req.subject.0, ::Method::Get); + assert_eq!(req.subject.1, "/echo".parse().unwrap()); + assert_eq!(req.version, ::HttpVersion::Http11); + assert_eq!(req.headers.len(), 1); + assert_eq!(req.headers.get_raw("Host").map(|raw| &raw[0]), Some(b"hyper.rs".as_ref())); + } + + + #[test] + fn test_parse_response() { + extern crate pretty_env_logger; + let _ = pretty_env_logger::init(); + let mut raw = BytesMut::from(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec()); + let expected_len = raw.len(); + let (req, len) = parse::(&mut raw).unwrap().unwrap(); + assert_eq!(len, expected_len); + assert_eq!(req.subject.0, 200); + assert_eq!(req.subject.1, "OK"); + assert_eq!(req.version, ::HttpVersion::Http11); + assert_eq!(req.headers.len(), 1); + assert_eq!(req.headers.get_raw("Content-Length").map(|raw| &raw[0]), Some(b"0".as_ref())); } #[test] fn test_parse_request_errors() { - let raw = MemBuf::from(b"GET htt:p// HTTP/1.1\r\nHost: hyper.rs\r\n\r\n".to_vec()); - parse::(&raw).unwrap_err(); + let mut raw = BytesMut::from(b"GET htt:p// HTTP/1.1\r\nHost: hyper.rs\r\n\r\n".to_vec()); + parse::(&mut raw).unwrap_err(); } #[test] fn test_parse_raw_status() { - let raw = MemBuf::from(b"HTTP/1.1 200 OK\r\n\r\n".to_vec()); - let (res, _) = parse::(&raw).unwrap().unwrap(); + let mut raw = BytesMut::from(b"HTTP/1.1 200 OK\r\n\r\n".to_vec()); + let (res, _) = parse::(&mut raw).unwrap().unwrap(); assert_eq!(res.subject.1, "OK"); - let raw = MemBuf::from(b"HTTP/1.1 200 Howdy\r\n\r\n".to_vec()); - let (res, _) = parse::(&raw).unwrap().unwrap(); + let mut raw = BytesMut::from(b"HTTP/1.1 200 Howdy\r\n\r\n".to_vec()); + let (res, _) = parse::(&mut raw).unwrap().unwrap(); assert_eq!(res.subject.1, "Howdy"); } @@ -312,26 +388,39 @@ mod tests { #[cfg(feature = "nightly")] #[bench] fn bench_parse_incoming(b: &mut Bencher) { - let mut raw = MemBuf::from(b"GET /super_long_uri/and_whatever?what_should_we_talk_about/\ - I_wonder/Hard_to_write_in_an_uri_after_all/you_have_to_make\ - _up_the_punctuation_yourself/how_fun_is_that?test=foo&test1=\ - foo1&test2=foo2&test3=foo3&test4=foo4 HTTP/1.1\r\nHost: \ - hyper.rs\r\nAccept: a lot of things\r\nAccept-Charset: \ - utf8\r\nAccept-Encoding: *\r\nAccess-Control-Allow-\ - Credentials: None\r\nAccess-Control-Allow-Origin: None\r\n\ - Access-Control-Allow-Methods: None\r\nAccess-Control-Allow-\ - Headers: None\r\nContent-Encoding: utf8\r\nContent-Security-\ - Policy: None\r\nContent-Type: text/html\r\nOrigin: hyper\ - \r\nSec-Websocket-Extensions: It looks super important!\r\n\ - Sec-Websocket-Origin: hyper\r\nSec-Websocket-Version: 4.3\r\ - \nStrict-Transport-Security: None\r\nUser-Agent: hyper\r\n\ - X-Content-Duration: None\r\nX-Content-Security-Policy: None\ - \r\nX-DNSPrefetch-Control: None\r\nX-Frame-Options: \ - Something important obviously\r\nX-Requested-With: Nothing\ - \r\n\r\n".to_vec()); + let mut raw = BytesMut::from( + b"GET /super_long_uri/and_whatever?what_should_we_talk_about/\ + I_wonder/Hard_to_write_in_an_uri_after_all/you_have_to_make\ + _up_the_punctuation_yourself/how_fun_is_that?test=foo&test1=\ + foo1&test2=foo2&test3=foo3&test4=foo4 HTTP/1.1\r\nHost: \ + hyper.rs\r\nAccept: a lot of things\r\nAccept-Charset: \ + utf8\r\nAccept-Encoding: *\r\nAccess-Control-Allow-\ + Credentials: None\r\nAccess-Control-Allow-Origin: None\r\n\ + Access-Control-Allow-Methods: None\r\nAccess-Control-Allow-\ + Headers: None\r\nContent-Encoding: utf8\r\nContent-Security-\ + Policy: None\r\nContent-Type: text/html\r\nOrigin: hyper\ + \r\nSec-Websocket-Extensions: It looks super important!\r\n\ + Sec-Websocket-Origin: hyper\r\nSec-Websocket-Version: 4.3\r\ + \nStrict-Transport-Security: None\r\nUser-Agent: hyper\r\n\ + X-Content-Duration: None\r\nX-Content-Security-Policy: None\ + \r\nX-DNSPrefetch-Control: None\r\nX-Frame-Options: \ + Something important obviously\r\nX-Requested-With: Nothing\ + \r\n\r\n".to_vec() + ); + let len = raw.len(); + + b.bytes = len as u64; b.iter(|| { - parse::(&raw).unwrap(); - raw.restart(); + parse::(&mut raw).unwrap(); + restart(&mut raw, len); }); + + + fn restart(b: &mut BytesMut, len: usize) { + b.reserve(1); + unsafe { + b.set_len(len); + } + } } } diff --git a/src/http/io.rs b/src/http/io.rs index 20a981ca..6655c426 100644 --- a/src/http/io.rs +++ b/src/http/io.rs @@ -1,20 +1,20 @@ use std::cmp; use std::fmt; -use std::io::{self, Read, Write}; +use std::io::{self, Write}; use std::ptr; use futures::Async; use tokio::io::Io; use http::{Http1Transaction, h1, MessageHead, ParseResult, DebugTruncate}; -use http::buf::{MemBuf, MemSlice}; +use bytes::{BytesMut, Bytes}; const INIT_BUFFER_SIZE: usize = 4096; pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; pub struct Buffered { io: T, - read_buf: MemBuf, + read_buf: BytesMut, write_buf: WriteBuf, } @@ -31,25 +31,25 @@ impl Buffered { pub fn new(io: T) -> Buffered { Buffered { io: io, - read_buf: MemBuf::new(), + read_buf: BytesMut::with_capacity(0), write_buf: WriteBuf::new(), } } pub fn read_buf(&self) -> &[u8] { - self.read_buf.bytes() + self.read_buf.as_ref() } pub fn consume_leading_lines(&mut self) { if !self.read_buf.is_empty() { let mut i = 0; while i < self.read_buf.len() { - match self.read_buf.bytes()[i] { + match self.read_buf[i] { b'\r' | b'\n' => i += 1, _ => break, } } - self.read_buf.slice(i); + self.read_buf.drain_to(i); } } @@ -59,7 +59,7 @@ impl Buffered { pub fn parse(&mut self) -> ::Result>> { self.reserve_read_buf(); - match self.read_buf.read_from(&mut self.io) { + match self.read_from_io() { Ok(0) => { trace!("parse eof"); return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "parse eof").into()); @@ -70,7 +70,7 @@ impl Buffered { _ => return Err(e.into()) } } - match try!(parse::(&self.read_buf)) { + match try!(parse::(&mut self.read_buf)) { Some(head) => { //trace!("parsed {} bytes out of {}", len, self.read_buf.len()); //self.read_buf.slice(len); @@ -87,8 +87,26 @@ impl Buffered { } } + fn read_from_io(&mut self) -> io::Result { + use bytes::BufMut; + unsafe { + let n = try!(self.io.read(self.read_buf.bytes_mut())); + self.read_buf.advance_mut(n); + Ok(n) + } + } + fn reserve_read_buf(&mut self) { + use bytes::BufMut; + if self.read_buf.remaining_mut() >= INIT_BUFFER_SIZE { + return + } self.read_buf.reserve(INIT_BUFFER_SIZE); + unsafe { + let buf = self.read_buf.bytes_mut(); + let len = buf.len(); + ptr::write_bytes(buf.as_mut_ptr(), 0, len); + } } pub fn buffer>(&mut self, buf: B) -> usize { @@ -101,7 +119,6 @@ impl Buffered { } } - impl Write for Buffered { fn write(&mut self, data: &[u8]) -> io::Result { Ok(self.write_buf.buffer(data)) @@ -122,25 +139,25 @@ impl Write for Buffered { } } -fn parse, I>(rdr: &MemBuf) -> ParseResult { +fn parse, I>(rdr: &mut BytesMut) -> ParseResult { h1::parse::(rdr) } pub trait MemRead { - fn read_mem(&mut self, len: usize) -> io::Result; + fn read_mem(&mut self, len: usize) -> io::Result; } -impl MemRead for Buffered { - fn read_mem(&mut self, len: usize) -> io::Result { +impl MemRead for Buffered { + fn read_mem(&mut self, len: usize) -> io::Result { trace!("Buffered.read_mem read_buf={}, wanted={}", self.read_buf.len(), len); if !self.read_buf.is_empty() { let n = ::std::cmp::min(len, self.read_buf.len()); trace!("Buffered.read_mem read_buf is not empty, slicing {}", n); - Ok(self.read_buf.slice(n)) + Ok(self.read_buf.drain_to(n).freeze()) } else { - self.read_buf.reset(); - let n = try!(self.read_buf.read_from(&mut self.io)); - Ok(self.read_buf.slice(::std::cmp::min(len, n))) + self.reserve_read_buf(); + let n = try!(self.read_from_io()); + Ok(self.read_buf.drain_to(::std::cmp::min(len, n)).freeze()) } } } @@ -288,6 +305,18 @@ impl WriteBuf { } } +#[cfg(test)] +use std::io::Read; + +#[cfg(test)] +impl MemRead for ::mock::AsyncIo { + fn read_mem(&mut self, len: usize) -> io::Result { + let mut v = vec![0; len]; + let n = try!(self.read(v.as_mut_slice())); + Ok(BytesMut::from(&v[..n]).freeze()) + } +} + #[test] fn test_iobuf_write_empty_slice() { use mock::{AsyncIo, Buf as MockBuf}; diff --git a/src/http/mod.rs b/src/http/mod.rs index 464583ed..c1a40119 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -2,6 +2,8 @@ use std::borrow::Cow; use std::fmt; +use bytes::BytesMut; + use header::{Connection, ConnectionOption}; use header::Headers; use method::Method; @@ -13,16 +15,15 @@ use version::HttpVersion::{Http10, Http11}; pub use self::conn::{Conn, KeepAlive, KA}; pub use self::body::{Body, TokioBody}; pub use self::chunk::Chunk; -use self::buf::MemBuf; +pub use self::str::ByteStr; mod body; -#[doc(hidden)] -pub mod buf; mod chunk; mod conn; mod io; mod h1; //mod h2; +mod str; /* macro_rules! nonblocking { @@ -124,8 +125,7 @@ pub enum ClientTransaction {} pub trait Http1Transaction { type Incoming; type Outgoing: Default; - //type KeepAlive: KeepAlive; - fn parse(bytes: &MemBuf) -> ParseResult; + fn parse(bytes: &mut BytesMut) -> ParseResult; fn decoder(head: &MessageHead) -> ::Result; fn encode(head: &mut MessageHead, dst: &mut Vec) -> h1::Encoder; fn should_set_length(head: &MessageHead) -> bool; diff --git a/src/http/str.rs b/src/http/str.rs new file mode 100644 index 00000000..eb9f94c0 --- /dev/null +++ b/src/http/str.rs @@ -0,0 +1,16 @@ +use std::str; + +use bytes::Bytes; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ByteStr(Bytes); + +impl ByteStr { + pub unsafe fn from_utf8_unchecked(slice: Bytes) -> ByteStr { + ByteStr(slice) + } + + pub fn as_str(&self) -> &str { + unsafe { str::from_utf8_unchecked(self.0.as_ref()) } + } +} diff --git a/src/lib.rs b/src/lib.rs index 5cbec6f4..b29aed7c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ #![doc(html_root_url = "https://hyperium.github.io/hyper/")] #![deny(missing_docs)] -#![deny(warnings)] +//#![deny(warnings)] #![deny(missing_debug_implementations)] #![cfg_attr(all(test, feature = "nightly"), feature(test))] @@ -14,6 +14,7 @@ //! [Server](server/index.html), along with a //! [typed Headers system](header/index.html). +extern crate bytes; #[macro_use] extern crate futures; extern crate futures_cpupool; extern crate httparse; diff --git a/src/uri.rs b/src/uri.rs index 83bfce94..b54754e4 100644 --- a/src/uri.rs +++ b/src/uri.rs @@ -2,7 +2,8 @@ use std::borrow::Cow; use std::fmt::{Display, self}; use std::ops::Deref; use std::str::{self, FromStr}; -use http::buf::MemStr; + +use http::ByteStr; use Url; use url::ParseError as UrlError; @@ -279,14 +280,14 @@ impl Display for Uri { } } -pub fn from_mem_str(s: MemStr) -> Result { +pub fn from_mem_str(s: ByteStr) -> Result { Uri::new(InternalUri::Shared(s)) } #[derive(Clone)] enum InternalUri { Cow(Cow<'static, str>), - Shared(MemStr), + Shared(ByteStr), } impl InternalUri {