perf(http): introduce MemBuf, a shared read buffer

This commit is contained in:
Sean McArthur
2017-01-07 09:08:17 -08:00
parent 2d2d5574a6
commit be461b4663
13 changed files with 473 additions and 456 deletions

View File

@@ -5,11 +5,15 @@ use futures::Async;
use tokio::io::Io;
use http::{Http1Transaction, h1, MessageHead, ParseResult};
use http::buf::{MemBuf, MemSlice};
use http::buffer::Buffer;
const INIT_BUFFER_SIZE: usize = 4096;
pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
pub struct Buffered<T> {
io: T,
read_buf: Buffer,
read_buf: MemBuf,
write_buf: Buffer,
}
@@ -26,7 +30,7 @@ impl<T: Io> Buffered<T> {
pub fn new(io: T) -> Buffered<T> {
Buffered {
io: io,
read_buf: Buffer::new(),
read_buf: MemBuf::new(),
write_buf: Buffer::new(),
}
}
@@ -36,7 +40,16 @@ impl<T: Io> Buffered<T> {
}
pub fn consume_leading_lines(&mut self) {
self.read_buf.consume_leading_lines();
if !self.read_buf.is_empty() {
let mut i = 0;
while i < self.read_buf.len() {
match self.read_buf.bytes()[i] {
b'\r' | b'\n' => i += 1,
_ => break,
}
}
self.read_buf.slice(i);
}
}
pub fn poll_read(&mut self) -> Async<()> {
@@ -44,6 +57,7 @@ impl<T: Io> Buffered<T> {
}
pub fn parse<S: Http1Transaction>(&mut self) -> ::Result<Option<MessageHead<S::Incoming>>> {
self.reserve_read_buf();
match self.read_buf.read_from(&mut self.io) {
Ok(0) => {
trace!("parse eof");
@@ -58,11 +72,11 @@ impl<T: Io> Buffered<T> {
match try!(parse::<S, _>(self.read_buf.bytes())) {
Some((head, len)) => {
trace!("parsed {} bytes out of {}", len, self.read_buf.len());
self.read_buf.consume(len);
self.read_buf.slice(len);
Ok(Some(head))
},
None => {
if self.read_buf.is_max_size() {
if self.read_buf.capacity() >= MAX_BUFFER_SIZE {
debug!("MAX_BUFFER_SIZE reached, closing");
Err(::Error::TooLarge)
} else {
@@ -72,6 +86,10 @@ impl<T: Io> Buffered<T> {
}
}
fn reserve_read_buf(&mut self) {
self.read_buf.reserve(INIT_BUFFER_SIZE);
}
pub fn buffer<B: AsRef<[u8]>>(&mut self, buf: B) {
self.write_buf.write(buf.as_ref());
}
@@ -82,9 +100,12 @@ impl<T: Io> Buffered<T> {
}
}
/*
impl<T: Read> Read for Buffered<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
trace!("Buffered.read self={}, buf={}", self.read_buf.len(), buf.len());
unimplemented!()
/*
let n = try!(self.read_buf.bytes().read(buf));
self.read_buf.consume(n);
if n == 0 {
@@ -93,8 +114,10 @@ impl<T: Read> Read for Buffered<T> {
} else {
Ok(n)
}
*/
}
}
*/
impl<T: Write> Write for Buffered<T> {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
@@ -111,10 +134,30 @@ impl<T: Write> Write for Buffered<T> {
})
}
}
fn parse<T: Http1Transaction<Incoming=I>, I>(rdr: &[u8]) -> ParseResult<I> {
h1::parse::<T, I>(rdr)
}
pub trait MemRead {
fn read_mem(&mut self, len: usize) -> io::Result<MemSlice>;
}
impl<T: Read> MemRead for Buffered<T> {
fn read_mem(&mut self, len: usize) -> io::Result<MemSlice> {
trace!("Buffered.read_mem read_buf={}, wanted={}", self.read_buf.len(), len);
if !self.read_buf.is_empty() {
let n = ::std::cmp::min(len, self.read_buf.len());
trace!("Buffered.read_mem read_buf is not empty, slicing {}", n);
Ok(self.read_buf.slice(n))
} else {
self.read_buf.reset();
let n = try!(self.read_buf.read_from(&mut self.io));
Ok(self.read_buf.slice(::std::cmp::min(len, n)))
}
}
}
#[derive(Clone)]
pub struct Cursor<T: AsRef<[u8]>> {
bytes: T,
@@ -181,9 +224,6 @@ impl<T: Write + ::vecio::Writev> AtomicWrite for T {
*/
impl<T: Write> AtomicWrite for T {
fn write_atomic(&mut self, bufs: &[&[u8]]) -> io::Result<usize> {
if cfg!(not(windows)) {
warn!("write_atomic not using writev");
}
let vec = bufs.concat();
self.write(&vec)
}