From be461b4663974788eb3caa4d654d190f05e29c09 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Sat, 7 Jan 2017 09:08:17 -0800 Subject: [PATCH] perf(http): introduce MemBuf, a shared read buffer --- .travis.yml | 1 + benches/end_to_end.rs | 55 ++++++++--- src/http/buf.rs | 214 ++++++++++++++++++++++++++++++++++++++++++ src/http/buffer.rs | 47 +--------- src/http/chunk.rs | 16 +++- src/http/conn.rs | 56 +++++++---- src/http/h1/decode.rs | 210 ++++++++++++++++++++--------------------- src/http/io.rs | 56 +++++++++-- src/http/mod.rs | 2 +- src/mock.rs | 24 ----- src/net.rs | 200 --------------------------------------- src/server/request.rs | 4 +- tests/server.rs | 44 ++------- 13 files changed, 473 insertions(+), 456 deletions(-) create mode 100644 src/http/buf.rs diff --git a/.travis.yml b/.travis.yml index 5f414dd3..ae548cd2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,6 +18,7 @@ script: - cargo build --verbose $FEATURES - cargo test --verbose $FEATURES - 'for f in ./doc/**/*.md; do echo "Running rustdoc on $f"; rustdoc -L ./target/debug -L ./target/debug/deps --test $f; done' + - 'if [ $TRAVIS_RUST_VERSION = nightly ]; then cargo bench $FEATURES; fi' addons: apt: diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 50d0dc11..6e6a5043 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -1,21 +1,24 @@ #![feature(test)] +#![deny(warnings)] extern crate futures; extern crate hyper; extern crate tokio_core; +extern crate pretty_env_logger; extern crate test; use futures::{Future, Stream}; use tokio_core::reactor::Core; +use hyper::client; use hyper::header::{ContentLength, ContentType}; -use hyper::server::{Service, Request, Response}; +use hyper::Method; +use hyper::server::{self, Service}; #[bench] -fn one_request_at_a_time(b: &mut test::Bencher) { - extern crate pretty_env_logger; +fn get_one_at_a_time(b: &mut test::Bencher) { let _ = pretty_env_logger::init(); let mut core = Core::new().unwrap(); let handle = core.handle(); @@ -23,11 +26,11 @@ fn one_request_at_a_time(b: &mut test::Bencher) { let addr = hyper::Server::http(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap() .handle(|| Ok(Hello), &handle).unwrap(); - let mut client = hyper::Client::new(&handle); + let client = hyper::Client::new(&handle); let url: hyper::Url = format!("http://{}/get", addr).parse().unwrap(); - b.bytes = 160; + b.bytes = 160 * 2 + PHRASE.len() as u64; b.iter(move || { let work = client.get(url.clone()).and_then(|res| { res.body().for_each(|_chunk| { @@ -39,19 +42,49 @@ fn one_request_at_a_time(b: &mut test::Bencher) { }); } -static PHRASE: &'static [u8] = b"Hello, World!"; +#[bench] +fn post_one_at_a_time(b: &mut test::Bencher) { + let _ = pretty_env_logger::init(); + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let addr = hyper::Server::http(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap() + .handle(|| Ok(Hello), &handle).unwrap(); + + let client = hyper::Client::new(&handle); + + let url: hyper::Url = format!("http://{}/get", addr).parse().unwrap(); + + let post = "foo bar baz quux"; + b.bytes = 180 * 2 + post.len() as u64 + PHRASE.len() as u64; + b.iter(move || { + let mut req = client::Request::new(Method::Post, url.clone()); + req.headers_mut().set(ContentLength(post.len() as u64)); + req.set_body(post); + + let work = client.get(url.clone()).and_then(|res| { + res.body().for_each(|_chunk| { + Ok(()) + }) + }); + + core.run(work).unwrap(); + }); +} + +static PHRASE: &'static [u8] = include_bytes!("../CHANGELOG.md"); //b"Hello, World!"; #[derive(Clone, Copy)] struct Hello; impl Service for Hello { - type Request = Request; - type Response = Response; + type Request = server::Request; + type Response = server::Response; type Error = hyper::Error; - type Future = ::futures::Finished; - fn call(&mut self, _req: Request) -> Self::Future { + type Future = ::futures::Finished; + fn call(&self, _req: Self::Request) -> Self::Future { ::futures::finished( - Response::new() + server::Response::new() .with_header(ContentLength(PHRASE.len() as u64)) .with_header(ContentType::plaintext()) .with_body(PHRASE) diff --git a/src/http/buf.rs b/src/http/buf.rs new file mode 100644 index 00000000..16793949 --- /dev/null +++ b/src/http/buf.rs @@ -0,0 +1,214 @@ +use std::cell::UnsafeCell; +use std::fmt; +use std::io::{self, Read}; +use std::ptr; +use std::sync::Arc; + +pub struct MemBuf { + buf: Arc>>, + start: usize, + end: usize, +} + +impl MemBuf { + pub fn new() -> MemBuf { + MemBuf::with_capacity(0) + } + + pub fn with_capacity(cap: usize) -> MemBuf { + MemBuf { + buf: Arc::new(UnsafeCell::new(vec![0; cap])), + start: 0, + end: 0, + } + } + + pub fn bytes(&self) -> &[u8] { + &self.buf()[self.start..self.end] + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn len(&self) -> usize { + self.end - self.start + } + + pub fn capacity(&self) -> usize { + self.buf().len() + } + + pub fn read_from(&mut self, io: &mut R) -> io::Result { + let start = self.end - self.start; + let n = try!(io.read(&mut self.buf_mut()[start..])); + self.end += n; + Ok(n) + } + + pub fn slice(&mut self, len: usize) -> MemSlice { + assert!(self.end - self.start >= len); + let start = self.start; + self.start += len; + MemSlice { + buf: self.buf.clone(), + start: start, + end: self.start, + } + } + + pub fn reserve(&mut self, needed: usize) { + let orig_cap = self.capacity(); + let remaining = orig_cap - self.end; + if remaining >= needed { + // all done + return + } + let is_unique = Arc::get_mut(&mut self.buf).is_some(); + trace!("MemBuf::reserve {} access", if is_unique { "unique" } else { "shared" }); + if is_unique && remaining + self.start >= needed { + // we have unique access, we can mutate this vector + trace!("MemBuf::reserve unique access, shifting"); + unsafe { + let mut buf = &mut *self.buf.get(); + let len = self.len(); + ptr::copy( + buf.as_ptr().offset(self.start as isize), + buf.as_mut_ptr(), + len + ); + self.start = 0; + self.end = len; + } + } else if is_unique { + // we have unique access, we can mutate this vector + trace!("MemBuf::reserve unique access, growing"); + unsafe { + let mut vec = &mut *self.buf.get(); + vec.reserve(needed); + let new_cap = vec.capacity(); + grow_zerofill(vec, new_cap - orig_cap); + } + } else { + // we need to allocate more space, but dont have unique + // access, so we need to make a new buffer + trace!("MemBuf::reserve shared buffer, creating new"); + let mut new = MemBuf::with_capacity(needed); + unsafe { + ptr::copy_nonoverlapping( + self.bytes().as_ptr(), + new.buf_mut().as_mut_ptr(), + self.len() + ); + } + new.end = self.len(); + *self = new; + } + } + + pub fn reset(&mut self) { + match Arc::get_mut(&mut self.buf) { + Some(_) => { + trace!("MemBuf::reset was unique, re-using"); + self.start = 0; + self.end = 0; + }, + None => { + trace!("MemBuf::reset not unique, creating new MemBuf"); + *self = MemBuf::with_capacity(self.buf().len()); + } + } + } + + fn buf_mut(&mut self) -> &mut [u8] { + // The contract here is that we NEVER have a MemSlice that exists + // with slice.end > self.start. + // In other words, we should *ALWAYS* be the only instance that can + // look at the bytes on the right side of self.start. + unsafe { + &mut (*self.buf.get())[self.start..] + } + } + + fn buf(&self) -> &Vec { + unsafe { + &*self.buf.get() + } + } +} + +#[inline] +unsafe fn grow_zerofill(buf: &mut Vec, additional: usize) { + let len = buf.len(); + buf.set_len(len + additional); + ::std::ptr::write_bytes(buf.as_mut_ptr().offset(len as isize), 0, buf.len()); +} + +impl fmt::Debug for MemBuf { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MemBuf") + .field("start", &self.start) + .field("end", &self.end) + .field("buf", &&self.buf()[self.start..self.end]) + .finish() + } +} + +pub struct MemSlice { + buf: Arc>>, + start: usize, + end: usize, +} + +impl MemSlice { + pub fn empty() -> MemSlice { + MemSlice { + buf: Arc::new(UnsafeCell::new(Vec::new())), + start: 0, + end: 0, + } + } +} + + +#[cfg(test)] +impl ::http::io::MemRead for ::mock::AsyncIo { + fn read_mem(&mut self, len: usize) -> io::Result { + let mut v = vec![0; len]; + let n = try!(self.read(v.as_mut_slice())); + v.truncate(n); + Ok(MemSlice { + buf: Arc::new(UnsafeCell::new(v)), + start: 0, + end: n, + }) + } +} + +impl fmt::Debug for MemSlice { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl ::std::ops::Deref for MemSlice { + type Target = [u8]; + fn deref(&self) -> &[u8] { + unsafe { + &(*self.buf.get())[self.start..self.end] + } + } +} + +unsafe impl Send for MemBuf {} +unsafe impl Send for MemSlice {} + +/* +#[cfg(test)] +mod tests { + use super::{MemBuf}; + + #[test] + fn test_ +} +*/ diff --git a/src/http/buffer.rs b/src/http/buffer.rs index 3c8ef504..0005a9c2 100644 --- a/src/http/buffer.rs +++ b/src/http/buffer.rs @@ -1,5 +1,5 @@ use std::cmp; -use std::io::{self, Read, Write}; +use std::io::{self, Write}; use std::ptr; @@ -18,10 +18,6 @@ impl Buffer { Buffer::default() } - pub fn reset(&mut self) { - *self = Buffer::new() - } - #[inline] pub fn len(&self) -> usize { self.tail - self.head @@ -32,50 +28,11 @@ impl Buffer { self.vec.len() - self.tail } - #[inline] - pub fn is_max_size(&self) -> bool { - self.len() >= MAX_BUFFER_SIZE - } - #[inline] pub fn is_empty(&self) -> bool { self.len() == 0 } - #[inline] - pub fn bytes(&self) -> &[u8] { - &self.vec[self.head..self.tail] - } - - #[inline] - pub fn consume(&mut self, pos: usize) { - debug_assert!(self.tail >= self.head + pos); - self.head += pos; - if self.head == self.tail { - self.head = 0; - self.tail = 0; - } - } - - pub fn consume_leading_lines(&mut self) { - while !self.is_empty() { - match self.vec[self.head] { - b'\r' | b'\n' => { - self.consume(1); - }, - _ => return - } - } - } - - pub fn read_from(&mut self, r: &mut R) -> io::Result { - self.maybe_reserve(1); - let n = try!(r.read(&mut self.vec[self.tail..])); - self.tail += n; - self.maybe_reset(); - Ok(n) - } - pub fn write_into(&mut self, w: &mut W) -> io::Result { if self.is_empty() { Ok(0) @@ -146,5 +103,5 @@ impl Buffer { unsafe fn grow_zerofill(buf: &mut Vec, additional: usize) { let len = buf.len(); buf.set_len(len + additional); - ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()); + ptr::write_bytes(buf.as_mut_ptr().offset(len as isize), 0, buf.len()); } diff --git a/src/http/chunk.rs b/src/http/chunk.rs index 45179adf..04e598fe 100644 --- a/src/http/chunk.rs +++ b/src/http/chunk.rs @@ -1,13 +1,15 @@ -use std::borrow::Borrow; use std::fmt; use std::sync::Arc; +use http::buf::MemSlice; + /// A piece of a message body. pub struct Chunk(Inner); enum Inner { Owned(Vec), Referenced(Arc>), + Mem(MemSlice), Static(&'static [u8]), } @@ -46,6 +48,12 @@ impl From<&'static str> for Chunk { } } +impl From for Chunk { + fn from(mem: MemSlice) -> Chunk { + Chunk(Inner::Mem(mem)) + } +} + impl ::std::ops::Deref for Chunk { type Target = [u8]; @@ -60,10 +68,8 @@ impl AsRef<[u8]> for Chunk { fn as_ref(&self) -> &[u8] { match self.0 { Inner::Owned(ref vec) => vec, - Inner::Referenced(ref vec) => { - let v: &Vec = vec.borrow(); - v.as_slice() - } + Inner::Referenced(ref vec) => vec, + Inner::Mem(ref slice) => slice, Inner::Static(slice) => slice, } } diff --git a/src/http/conn.rs b/src/http/conn.rs index 9258efbe..3f4f0fe2 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -48,8 +48,7 @@ impl Conn { match self.state.reading { Reading::Init | Reading::Body(..) => self.io.poll_read().is_ready(), - Reading::KeepAlive | - Reading::Closed => true, + Reading::KeepAlive | Reading::Closed => true, } } @@ -84,15 +83,24 @@ impl Conn { Ok(Some(head)) => (head.version, head), Ok(None) => return Ok(Async::NotReady), Err(e) => { + let must_respond_with_error = !self.state.was_idle(); self.state.close(); self.io.consume_leading_lines(); - if !self.io.read_buf().is_empty() { + let ret = if !self.io.read_buf().is_empty() { error!("parse error ({}) with bytes: {:?}", e, self.io.read_buf()); - return Ok(Async::Ready(Some(Frame::Error { error: e }))); + Ok(Async::Ready(Some(Frame::Error { error: e }))) } else { trace!("parse error with 0 input, err = {:?}", e); - return Ok(Async::Ready(None)); - } + if must_respond_with_error { + match e { + ::Error::Io(io) => Err(io), + other => Err(io::Error::new(io::ErrorKind::UnexpectedEof, other)), + } + } else { + Ok(Async::Ready(None)) + } + }; + return ret; } }; @@ -106,6 +114,7 @@ impl Conn { return Ok(Async::Ready(Some(Frame::Error { error: e }))); } }; + self.state.busy(); let wants_keep_alive = head.should_keep_alive(); self.state.keep_alive &= wants_keep_alive; let (body, reading) = if decoder.is_eof() { @@ -131,12 +140,9 @@ impl Conn { let (reading, ret) = match self.state.reading { Reading::Body(ref mut decoder) => { - //TODO use an appendbuf or something - let mut buf = vec![0; 1024 * 4]; - let n = try_nb!(decoder.decode(&mut self.io, &mut buf)); - if n > 0 { - buf.truncate(n); - return Ok(Async::Ready(Some(http::Chunk::from(buf)))); + let slice = try_nb!(decoder.decode(&mut self.io)); + if !slice.is_empty() { + return Ok(Async::Ready(Some(http::Chunk::from(slice)))); } else { if decoder.is_eof() { (Reading::KeepAlive, Ok(Async::Ready(None))) @@ -219,11 +225,13 @@ impl Conn { wbuf.consume(n); if !wbuf.is_written() { + trace!("Conn::start_send frame not written, queued"); *queued = Some(wbuf); } }, Err(e) => match e.kind() { io::ErrorKind::WouldBlock => { + trace!("Conn::start_send frame not written, queued"); *queued = Some(wbuf); }, _ => return Err(e) @@ -258,9 +266,9 @@ impl Conn { trace!("Conn::write_queued complete = {}", complete); if complete { *queued = None; - Ok(Async::NotReady) - } else { Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) } }, _ => Ok(Async::Ready(())), @@ -268,14 +276,14 @@ impl Conn { } fn flush(&mut self) -> Poll<(), io::Error> { - try_nb!(self.write_queued()); + let ret = try!(self.write_queued()); try_nb!(self.io.flush()); self.state.try_keep_alive(); trace!("flushed {:?}", self.state); if self.is_read_ready() { ::futures::task::park().unpark(); } - Ok(Async::Ready(())) + Ok(ret) } } @@ -461,7 +469,7 @@ impl KeepAlive for KA { impl State { fn close(&mut self) { - trace!("State::close"); + trace!("State::close()"); self.reading = Reading::Closed; self.writing = Writing::Closed; self.keep_alive.disable(); @@ -486,6 +494,18 @@ impl State { } } + fn was_idle(&self) -> bool { + if let KA::Idle(..) = self.keep_alive.status() { + true + } else { + false + } + } + + fn busy(&mut self) { + self.keep_alive.busy(); + } + fn is_read_closed(&self) -> bool { match self.reading { Reading::Closed => true, @@ -522,7 +542,7 @@ impl<'a, T: fmt::Debug + 'a> fmt::Debug for DebugFrame<'a, T> { }, Frame::Body { chunk: None } => { f.debug_struct("Body") - .field("chunk", &"None") + .field("chunk", &None::<()>) .finish() }, Frame::Error { ref error } => { diff --git a/src/http/h1/decode.rs b/src/http/h1/decode.rs index 8e65d544..aaef02ea 100644 --- a/src/http/h1/decode.rs +++ b/src/http/h1/decode.rs @@ -1,5 +1,8 @@ -use std::{cmp, usize}; -use std::io::{self, Read}; +use std::usize; +use std::io; + +use http::buf::MemSlice; +use http::io::MemRead; use self::Kind::{Length, Chunked, Eof}; @@ -76,15 +79,16 @@ impl Decoder { } impl Decoder { - pub fn decode(&mut self, body: &mut R, buf: &mut [u8]) -> io::Result { + pub fn decode(&mut self, body: &mut R) -> io::Result { match self.kind { Length(ref mut remaining) => { trace!("Sized read, remaining={:?}", remaining); if *remaining == 0 { - Ok(0) + Ok(MemSlice::empty()) } else { - let to_read = cmp::min(*remaining as usize, buf.len()); - let num = try!(body.read(&mut buf[..to_read])) as u64; + let to_read = *remaining as usize; + let buf = try!(body.read_mem(to_read)); + let num = buf.len() as u64; trace!("Length read: {}", num); if num > *remaining { *remaining = 0; @@ -93,30 +97,37 @@ impl Decoder { } else { *remaining -= num; } - Ok(num as usize) + Ok(buf) } } Chunked(ref mut state, ref mut size) => { loop { - let mut read = 0; + let mut buf = None; // advances the chunked state - *state = try!(state.step(body, size, buf, &mut read)); + *state = try!(state.step(body, size, &mut buf)); if *state == ChunkedState::End { trace!("end of chunked"); - return Ok(0); + return Ok(MemSlice::empty()); } - if read > 0 { - return Ok(read); + if let Some(buf) = buf { + return Ok(buf); } } } Eof(ref mut is_eof) => { - match body.read(buf) { - Ok(0) => { - *is_eof = true; - Ok(0) + if *is_eof { + Ok(MemSlice::empty()) + } else { + // 8192 chosen because its about 2 packets, there probably + // won't be that much available, so don't have MemReaders + // allocate buffers to big + match body.read_mem(8192) { + Ok(slice) => { + *is_eof = slice.is_empty(); + Ok(slice) + } + other => other, } - other => other, } } } @@ -125,29 +136,29 @@ impl Decoder { macro_rules! byte ( ($rdr:ident) => ({ - let mut buf = [0]; - match try!($rdr.read(&mut buf)) { - 1 => buf[0], - _ => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, - "Unexpected eof during chunk size line")), + let buf = try!($rdr.read_mem(1)); + if !buf.is_empty() { + buf[0] + } else { + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, + "Unexpected eof during chunk size line")); } }) ); impl ChunkedState { - fn step(&self, - body: &mut R, - size: &mut u64, - buf: &mut [u8], - read: &mut usize) - -> io::Result { + fn step(&self, + body: &mut R, + size: &mut u64, + buf: &mut Option) + -> io::Result { use self::ChunkedState::*; Ok(match *self { Size => try!(ChunkedState::read_size(body, size)), SizeLws => try!(ChunkedState::read_size_lws(body)), Extension => try!(ChunkedState::read_extension(body)), SizeLf => try!(ChunkedState::read_size_lf(body, size)), - Body => try!(ChunkedState::read_body(body, size, buf, read)), + Body => try!(ChunkedState::read_body(body, size, buf)), BodyCr => try!(ChunkedState::read_body_cr(body)), BodyLf => try!(ChunkedState::read_body_lf(body)), EndCr => try!(ChunkedState::read_end_cr(body)), @@ -155,8 +166,8 @@ impl ChunkedState { End => ChunkedState::End, }) } - fn read_size(rdr: &mut R, size: &mut u64) -> io::Result { - trace!("Read size"); + fn read_size(rdr: &mut R, size: &mut u64) -> io::Result { + trace!("Read chunk hex size"); let radix = 16; match byte!(rdr) { b @ b'0'...b'9' => { @@ -181,7 +192,7 @@ impl ChunkedState { } Ok(ChunkedState::Size) } - fn read_size_lws(rdr: &mut R) -> io::Result { + fn read_size_lws(rdr: &mut R) -> io::Result { trace!("read_size_lws"); match byte!(rdr) { // LWS can follow the chunk size, but no more digits can come @@ -194,14 +205,14 @@ impl ChunkedState { } } } - fn read_extension(rdr: &mut R) -> io::Result { + fn read_extension(rdr: &mut R) -> io::Result { trace!("read_extension"); match byte!(rdr) { b'\r' => return Ok(ChunkedState::SizeLf), _ => return Ok(ChunkedState::Extension), // no supported extensions } } - fn read_size_lf(rdr: &mut R, size: &mut u64) -> io::Result { + fn read_size_lf(rdr: &mut R, size: &mut u64) -> io::Result { trace!("Chunk size is {:?}", size); match byte!(rdr) { b'\n' if *size > 0 => Ok(ChunkedState::Body), @@ -210,10 +221,9 @@ impl ChunkedState { } } - fn read_body(rdr: &mut R, + fn read_body(rdr: &mut R, rem: &mut u64, - buf: &mut [u8], - read: &mut usize) + buf: &mut Option) -> io::Result { trace!("Chunked read, remaining={:?}", rem); @@ -223,19 +233,16 @@ impl ChunkedState { r => r as usize, }; - let to_read = cmp::min(rem_cap, buf.len()); - let count = try!(rdr.read(&mut buf[..to_read])); - - trace!("to_read = {}", to_read); - trace!("count = {}", count); + let to_read = rem_cap; + let slice = try!(rdr.read_mem(to_read)); + let count = slice.len(); if count == 0 { *rem = 0; return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")); } - + *buf = Some(slice); *rem -= count as u64; - *read = count; if *rem > 0 { Ok(ChunkedState::Body) @@ -243,26 +250,26 @@ impl ChunkedState { Ok(ChunkedState::BodyCr) } } - fn read_body_cr(rdr: &mut R) -> io::Result { + fn read_body_cr(rdr: &mut R) -> io::Result { match byte!(rdr) { b'\r' => Ok(ChunkedState::BodyLf), _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body CR")), } } - fn read_body_lf(rdr: &mut R) -> io::Result { + fn read_body_lf(rdr: &mut R) -> io::Result { match byte!(rdr) { b'\n' => Ok(ChunkedState::Size), _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body LF")), } } - fn read_end_cr(rdr: &mut R) -> io::Result { + fn read_end_cr(rdr: &mut R) -> io::Result { match byte!(rdr) { b'\r' => Ok(ChunkedState::EndLf), _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end CR")), } } - fn read_end_lf(rdr: &mut R) -> io::Result { + fn read_end_lf(rdr: &mut R) -> io::Result { match byte!(rdr) { b'\n' => Ok(ChunkedState::End), _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end LF")), @@ -277,8 +284,23 @@ mod tests { use std::io::Write; use super::Decoder; use super::ChunkedState; + use http::io::MemRead; + use http::buf::{MemBuf, MemSlice}; use mock::AsyncIo; + impl<'a> MemRead for &'a [u8] { + fn read_mem(&mut self, len: usize) -> io::Result { + let n = ::std::cmp::min(len, self.len()); + if n > 0 { + let mut buf = MemBuf::with_capacity(n); + buf.read_from(self).unwrap(); + Ok(buf.slice(n)) + } else { + Ok(MemSlice::empty()) + } + } + } + #[test] fn test_read_chunk_size() { use std::io::ErrorKind::{UnexpectedEof, InvalidInput}; @@ -287,13 +309,10 @@ mod tests { let mut state = ChunkedState::Size; let mut rdr = &mut s.as_bytes(); let mut size = 0; - let mut count = 0; loop { - let mut buf = [0u8; 10]; - let result = state.step(&mut rdr, &mut size, &mut buf, &mut count); + let result = state.step(rdr, &mut size, &mut None); let desc = format!("read_size failed for {:?}", s); state = result.expect(desc.as_str()); - trace!("State {:?}", state); if state == ChunkedState::Body || state == ChunkedState::EndCr { break; } @@ -305,10 +324,8 @@ mod tests { let mut state = ChunkedState::Size; let mut rdr = &mut s.as_bytes(); let mut size = 0; - let mut count = 0; loop { - let mut buf = [0u8; 10]; - let result = state.step(&mut rdr, &mut size, &mut buf, &mut count); + let result = state.step(rdr, &mut size, &mut None); state = match result { Ok(s) => s, Err(e) => { @@ -317,7 +334,6 @@ mod tests { return; } }; - trace!("State {:?}", state); if state == ChunkedState::Body || state == ChunkedState::End { panic!(format!("Was Ok. Expected Err for {:?}", s)); } @@ -359,9 +375,8 @@ mod tests { fn test_read_sized_early_eof() { let mut bytes = &b"foo bar"[..]; let mut decoder = Decoder::length(10); - let mut buf = [0u8; 10]; - assert_eq!(decoder.decode(&mut bytes, &mut buf).unwrap(), 7); - let e = decoder.decode(&mut bytes, &mut buf).unwrap_err(); + assert_eq!(decoder.decode(&mut bytes).unwrap().len(), 7); + let e = decoder.decode(&mut bytes).unwrap_err(); assert_eq!(e.kind(), io::ErrorKind::Other); assert_eq!(e.description(), "early eof"); } @@ -373,68 +388,63 @@ mod tests { foo bar\ "[..]; let mut decoder = Decoder::chunked(); - let mut buf = [0u8; 10]; - assert_eq!(decoder.decode(&mut bytes, &mut buf).unwrap(), 7); - let e = decoder.decode(&mut bytes, &mut buf).unwrap_err(); + assert_eq!(decoder.decode(&mut bytes).unwrap().len(), 7); + let e = decoder.decode(&mut bytes).unwrap_err(); assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); assert_eq!(e.description(), "early eof"); } #[test] fn test_read_chunked_single_read() { - let content = b"10\r\n1234567890abcdef\r\n0\r\n"; - let mut mock_buf = io::Cursor::new(content); - let mut buf = [0u8; 16]; - let count = Decoder::chunked().decode(&mut mock_buf, &mut buf).expect("decode"); - assert_eq!(16, count); + let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n"[..]; + let buf = Decoder::chunked().decode(&mut mock_buf).expect("decode"); + assert_eq!(16, buf.len()); let result = String::from_utf8(buf.to_vec()).expect("decode String"); assert_eq!("1234567890abcdef", &result); } #[test] fn test_read_chunked_after_eof() { - let content = b"10\r\n1234567890abcdef\r\n0\r\n\r\n"; - let mut mock_buf = io::Cursor::new(content); - let mut buf = [0u8; 50]; + let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n\r\n"[..]; let mut decoder = Decoder::chunked(); // normal read - let count = decoder.decode(&mut mock_buf, &mut buf).expect("decode"); - assert_eq!(16, count); - let result = String::from_utf8(buf[0..count].to_vec()).expect("decode String"); + let buf = decoder.decode(&mut mock_buf).expect("decode"); + assert_eq!(16, buf.len()); + let result = String::from_utf8(buf.to_vec()).expect("decode String"); assert_eq!("1234567890abcdef", &result); // eof read - let count = decoder.decode(&mut mock_buf, &mut buf).expect("decode"); - assert_eq!(0, count); + let buf = decoder.decode(&mut mock_buf).expect("decode"); + assert_eq!(0, buf.len()); // ensure read after eof also returns eof - let count = decoder.decode(&mut mock_buf, &mut buf).expect("decode"); - assert_eq!(0, count); + let buf = decoder.decode(&mut mock_buf).expect("decode"); + assert_eq!(0, buf.len()); } // perform an async read using a custom buffer size and causing a blocking // read at the specified byte fn read_async(mut decoder: Decoder, content: &[u8], - block_at: usize, - read_buffer_size: usize) + block_at: usize) -> String { let content_len = content.len(); - let mock_buf = io::Cursor::new(content.clone()); - let mut ins = AsyncIo::new(mock_buf, block_at); - let mut outs = vec![]; + let mut ins = AsyncIo::new(content, block_at); + let mut outs = Vec::new(); loop { - let mut buf = vec![0; read_buffer_size]; - match decoder.decode(&mut ins, buf.as_mut_slice()) { - Ok(0) => break, - Ok(i) => outs.write(&buf[0..i]).expect("write buffer"), - Err(e) => { - if e.kind() != io::ErrorKind::WouldBlock { - break; + match decoder.decode(&mut ins) { + Ok(buf) => { + if buf.is_empty() { + break; // eof } - ins.block_in(content_len); // we only block once - 0 as usize + outs.write(&buf).expect("write buffer"); + } + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => { + ins.block_in(content_len); // we only block once + }, + _ => panic!("unexpected decode error: {}", e), } }; } @@ -442,22 +452,12 @@ mod tests { } // iterate over the different ways that this async read could go. - // tests every combination of buffer size that is passed in, with a blocking - // read at each byte along the content - The shotgun approach + // tests blocking a read at each byte along the content - The shotgun approach fn all_async_cases(content: &str, expected: &str, decoder: Decoder) { let content_len = content.len(); for block_at in 0..content_len { - for read_buffer_size in 1..content_len { - let actual = read_async(decoder.clone(), - content.as_bytes(), - block_at, - read_buffer_size); - assert_eq!(expected, - &actual, - "Failed async. Blocking at {} with read buffer size {}", - block_at, - read_buffer_size); - } + let actual = read_async(decoder.clone(), content.as_bytes(), block_at); + assert_eq!(expected, &actual, "Failed async. Blocking at {}", block_at); } } @@ -469,7 +469,7 @@ mod tests { #[test] fn test_read_chunked_async() { - let content = "3\r\nfoo\r\n3\r\nbar\r\n0\r\n"; + let content = "3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n"; let expected = "foobar"; all_async_cases(content, expected, Decoder::chunked()); } diff --git a/src/http/io.rs b/src/http/io.rs index 656b2cc5..7f9024a5 100644 --- a/src/http/io.rs +++ b/src/http/io.rs @@ -5,11 +5,15 @@ use futures::Async; use tokio::io::Io; use http::{Http1Transaction, h1, MessageHead, ParseResult}; +use http::buf::{MemBuf, MemSlice}; use http::buffer::Buffer; +const INIT_BUFFER_SIZE: usize = 4096; +pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; + pub struct Buffered { io: T, - read_buf: Buffer, + read_buf: MemBuf, write_buf: Buffer, } @@ -26,7 +30,7 @@ impl Buffered { pub fn new(io: T) -> Buffered { Buffered { io: io, - read_buf: Buffer::new(), + read_buf: MemBuf::new(), write_buf: Buffer::new(), } } @@ -36,7 +40,16 @@ impl Buffered { } pub fn consume_leading_lines(&mut self) { - self.read_buf.consume_leading_lines(); + if !self.read_buf.is_empty() { + let mut i = 0; + while i < self.read_buf.len() { + match self.read_buf.bytes()[i] { + b'\r' | b'\n' => i += 1, + _ => break, + } + } + self.read_buf.slice(i); + } } pub fn poll_read(&mut self) -> Async<()> { @@ -44,6 +57,7 @@ impl Buffered { } pub fn parse(&mut self) -> ::Result>> { + self.reserve_read_buf(); match self.read_buf.read_from(&mut self.io) { Ok(0) => { trace!("parse eof"); @@ -58,11 +72,11 @@ impl Buffered { match try!(parse::(self.read_buf.bytes())) { Some((head, len)) => { trace!("parsed {} bytes out of {}", len, self.read_buf.len()); - self.read_buf.consume(len); + self.read_buf.slice(len); Ok(Some(head)) }, None => { - if self.read_buf.is_max_size() { + if self.read_buf.capacity() >= MAX_BUFFER_SIZE { debug!("MAX_BUFFER_SIZE reached, closing"); Err(::Error::TooLarge) } else { @@ -72,6 +86,10 @@ impl Buffered { } } + fn reserve_read_buf(&mut self) { + self.read_buf.reserve(INIT_BUFFER_SIZE); + } + pub fn buffer>(&mut self, buf: B) { self.write_buf.write(buf.as_ref()); } @@ -82,9 +100,12 @@ impl Buffered { } } +/* impl Read for Buffered { fn read(&mut self, buf: &mut [u8]) -> io::Result { trace!("Buffered.read self={}, buf={}", self.read_buf.len(), buf.len()); + unimplemented!() + /* let n = try!(self.read_buf.bytes().read(buf)); self.read_buf.consume(n); if n == 0 { @@ -93,8 +114,10 @@ impl Read for Buffered { } else { Ok(n) } + */ } } +*/ impl Write for Buffered { fn write(&mut self, data: &[u8]) -> io::Result { @@ -111,10 +134,30 @@ impl Write for Buffered { }) } } + fn parse, I>(rdr: &[u8]) -> ParseResult { h1::parse::(rdr) } +pub trait MemRead { + fn read_mem(&mut self, len: usize) -> io::Result; +} + +impl MemRead for Buffered { + fn read_mem(&mut self, len: usize) -> io::Result { + trace!("Buffered.read_mem read_buf={}, wanted={}", self.read_buf.len(), len); + if !self.read_buf.is_empty() { + let n = ::std::cmp::min(len, self.read_buf.len()); + trace!("Buffered.read_mem read_buf is not empty, slicing {}", n); + Ok(self.read_buf.slice(n)) + } else { + self.read_buf.reset(); + let n = try!(self.read_buf.read_from(&mut self.io)); + Ok(self.read_buf.slice(::std::cmp::min(len, n))) + } + } +} + #[derive(Clone)] pub struct Cursor> { bytes: T, @@ -181,9 +224,6 @@ impl AtomicWrite for T { */ impl AtomicWrite for T { fn write_atomic(&mut self, bufs: &[&[u8]]) -> io::Result { - if cfg!(not(windows)) { - warn!("write_atomic not using writev"); - } let vec = bufs.concat(); self.write(&vec) } diff --git a/src/http/mod.rs b/src/http/mod.rs index 587b6289..abe73c98 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -15,7 +15,7 @@ pub use self::body::{Body, TokioBody}; pub use self::chunk::Chunk; mod body; -//mod buf; +mod buf; mod buffer; mod chunk; mod conn; diff --git a/src/mock.rs b/src/mock.rs index cc870f11..e13c35e6 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -52,18 +52,6 @@ impl Read for Buf { } } -impl ::vecio::Writev for Buf { - fn writev(&mut self, bufs: &[&[u8]]) -> io::Result { - let cap = bufs.iter().map(|buf| buf.len()).fold(0, |total, next| total + next); - let mut vec = Vec::with_capacity(cap); - for &buf in bufs { - vec.extend(buf); - } - - self.write(&vec) - } -} - #[derive(Debug)] pub struct AsyncIo { inner: T, @@ -147,18 +135,6 @@ impl Io for AsyncIo { } } -impl ::vecio::Writev for AsyncIo { - fn writev(&mut self, bufs: &[&[u8]]) -> io::Result { - let cap = bufs.iter().map(|buf| buf.len()).fold(0, |total, next| total + next); - let mut vec = Vec::with_capacity(cap); - for &buf in bufs { - vec.extend(buf); - } - - self.write(&vec) - } -} - impl ::std::ops::Deref for AsyncIo { type Target = [u8]; diff --git a/src/net.rs b/src/net.rs index 63ce09b5..8b137891 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1,201 +1 @@ -//! A collection of traits abstracting over Listeners and Streams. -use std::io::{self, Read, Write}; -use std::net::{SocketAddr}; -use std::option; -use std::net::{TcpStream, TcpListener}; - - -/// An alias to `mio::tcp::TcpStream`. -//#[derive(Debug)] -pub struct HttpStream(pub ::tokio::net::TcpStream); - -impl Read for HttpStream { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.0.read(buf) - } -} - -impl Write for HttpStream { - #[inline] - fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.write(buf) - } - - #[inline] - fn flush(&mut self) -> io::Result<()> { - self.0.flush() - } -} - -/* -#[cfg(not(windows))] -impl ::vecio::Writev for HttpStream { - #[inline] - fn writev(&mut self, bufs: &[&[u8]]) -> io::Result { - use ::vecio::Rawv; - self.0.writev(bufs) - } -} -*/ - -/// An alias to `mio::tcp::TcpListener`. -#[derive(Debug)] -pub struct HttpListener(pub TcpListener); - -impl HttpListener { - /// Bind to a socket address. - pub fn bind(addr: &SocketAddr) -> io::Result { - TcpListener::bind(addr) - .map(HttpListener) - } - - /// Try to duplicate the underlying listening socket. - pub fn try_clone(&self) -> io::Result { - self.0.try_clone().map(HttpListener) - } -} - -/* -/// An abstraction to allow any SSL implementation to be used with client-side `HttpsStream`s. -pub trait SslClient { - /// The protected stream. - type Stream: Transport; - /// Wrap a client stream with SSL. - fn wrap_client(&self, stream: HttpStream, host: &str) -> ::Result; -} - -/// An abstraction to allow any SSL implementation to be used with server-side `HttpsStream`s. -pub trait SslServer { - /// The protected stream. - type Stream: Transport; - /// Wrap a server stream with SSL. - fn wrap_server(&self, stream: HttpStream) -> ::Result; -} - - -/// A stream over the HTTP protocol, possibly protected by TLS. -#[derive(Debug)] -pub enum HttpsStream { - /// A plain text stream. - Http(HttpStream), - /// A stream protected by TLS. - Https(S) -} - -impl Read for HttpsStream { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match *self { - HttpsStream::Http(ref mut s) => s.read(buf), - HttpsStream::Https(ref mut s) => s.read(buf) - } - } -} - -impl Write for HttpsStream { - #[inline] - fn write(&mut self, msg: &[u8]) -> io::Result { - match *self { - HttpsStream::Http(ref mut s) => s.write(msg), - HttpsStream::Https(ref mut s) => s.write(msg) - } - } - - #[inline] - fn flush(&mut self) -> io::Result<()> { - match *self { - HttpsStream::Http(ref mut s) => s.flush(), - HttpsStream::Https(ref mut s) => s.flush() - } - } -} - -/* -#[cfg(not(windows))] -impl ::vecio::Writev for HttpsStream { - #[inline] - fn writev(&mut self, bufs: &[&[u8]]) -> io::Result { - match *self { - HttpsStream::Http(ref mut s) => s.writev(bufs), - HttpsStream::Https(ref mut s) => s.writev(bufs) - } - } -} -*/ - - -/* -#[cfg(unix)] -impl ::std::os::unix::io::AsRawFd for HttpStream { - #[inline] - fn as_raw_fd(&self) -> ::std::os::unix::io::RawFd { - self.0.as_raw_fd() - } -} - -#[cfg(unix)] -impl ::std::os::unix::io::AsRawFd for HttpsStream { - #[inline] - fn as_raw_fd(&self) -> ::std::os::unix::io::RawFd { - match *self { - HttpsStream::Http(ref s) => s.as_raw_fd(), - HttpsStream::Https(ref s) => s.as_raw_fd(), - } - } -} -*/ - -/// An `HttpListener` over SSL. -#[derive(Debug)] -pub struct HttpsListener { - listener: TcpListener, - ssl: S, -} - -impl HttpsListener { - /// Start listening to an address over HTTPS. - #[inline] - pub fn new(addr: &SocketAddr, ssl: S) -> io::Result> { - TcpListener::bind(addr).map(|l| HttpsListener { - listener: l, - ssl: ssl - }) - } - - /// Construct an `HttpsListener` from a bound `TcpListener`. - pub fn with_listener(listener: TcpListener, ssl: S) -> HttpsListener { - HttpsListener { - listener: listener, - ssl: ssl - } - } -} - -/* -impl Accept for HttpsListener { - type Output = S::Stream; - - #[inline] - fn accept(&self) -> io::Result> { - self.listener.accept().and_then(|s| match s { - Some((s, _)) => self.ssl.wrap_server(HttpStream(s)).map(Some).map_err(|e| { - match e { - ::Error::Io(e) => e, - _ => io::Error::new(io::ErrorKind::Other, e), - - } - }), - None => Ok(None), - }) - } - - #[inline] - fn local_addr(&self) -> io::Result { - self.listener.local_addr() - } -} -*/ - -*/ diff --git a/src/server/request.rs b/src/server/request.rs index 8bc9cb64..ce23b6ff 100644 --- a/src/server/request.rs +++ b/src/server/request.rs @@ -92,8 +92,8 @@ impl fmt::Debug for Request { pub fn new(addr: SocketAddr, incoming: RequestHead, body: Body) -> Request { let MessageHead { version, subject: RequestLine(method, uri), headers } = incoming; - debug!("Request Line: {:?} {:?} {:?}", method, uri, version); - debug!("{:#?}", headers); + debug!("Request::new: addr={}, req=\"{} {} {}\"", addr, method, uri, version); + debug!("Request::new: headers={:?}", headers); Request { method: method, diff --git a/tests/server.rs b/tests/server.rs index f5a00de7..d38b9b4b 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -2,6 +2,7 @@ extern crate hyper; extern crate futures; extern crate spmc; +extern crate pretty_env_logger; use futures::Future; use futures::stream::Stream; @@ -9,6 +10,7 @@ use futures::stream::Stream; use std::net::{TcpStream, SocketAddr}; use std::io::{Read, Write}; use std::sync::mpsc; +use std::thread; use std::time::Duration; use hyper::server::{Server, Request, Response, Service, NewService}; @@ -25,12 +27,6 @@ impl Serve { self.listening.as_ref().unwrap().addr() } - /* - fn head(&self) -> Request { - unimplemented!() - } - */ - fn body(&self) -> Vec { let mut buf = vec![]; while let Ok(Msg::Chunk(msg)) = self.msg_rx.try_recv() { @@ -152,7 +148,7 @@ fn serve() -> Serve { } fn serve_with_timeout(dur: Option) -> Serve { - use std::thread; + let _ = pretty_env_logger::init(); let (thread_tx, thread_rx) = mpsc::channel(); let (spawn_tx, spawn_rx) = mpsc::channel(); @@ -195,7 +191,7 @@ fn server_get_should_ignore_body() { req.write_all(b"\ GET / HTTP/1.1\r\n\ Host: example.domain\r\n\ - Connection: close\r\n + Connection: close\r\n\ \r\n\ I shouldn't be read.\r\n\ ").unwrap(); @@ -223,7 +219,6 @@ fn server_get_with_body() { #[test] fn server_get_fixed_response() { - let foo_bar = b"foo bar baz"; let server = serve(); server.reply() @@ -256,7 +251,7 @@ fn server_get_chunked_response() { req.write_all(b"\ GET / HTTP/1.1\r\n\ Host: example.domain\r\n\ - Connection: close\r\n + Connection: close\r\n\ \r\n\ ").unwrap(); let mut body = String::new(); @@ -358,8 +353,8 @@ fn server_empty_response_chunked() { req.write_all(b"\ GET / HTTP/1.1\r\n\ Host: example.domain\r\n\ - Content-Length: 0\r\n - Connection: close\r\n + Content-Length: 0\r\n\ + Connection: close\r\n\ \r\n\ ").unwrap(); @@ -462,28 +457,3 @@ fn server_keep_alive() { } } } - -/* -#[test] -fn server_get_with_body_three_listeners() { - let server = serve_n(3); - let addrs = server.addrs(); - assert_eq!(addrs.len(), 3); - - for (i, addr) in addrs.iter().enumerate() { - let mut req = TcpStream::connect(addr).unwrap(); - write!(req, "\ - GET / HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Content-Length: 17\r\n\ - \r\n\ - I'm sending to {}.\r\n\ - ", i).unwrap(); - req.read(&mut [0; 256]).unwrap(); - - // note: doesnt include trailing \r\n, cause Content-Length wasn't 19 - let comparison = format!("I'm sending to {}.", i).into_bytes(); - assert_eq!(server.body(), comparison); - } -} -*/