feat(http): use the bytes crate for Chunk and internally

This commit is contained in:
Sean McArthur
2017-03-01 14:14:34 -08:00
parent cf7cc50ad0
commit 65b3e08f69
14 changed files with 306 additions and 598 deletions

View File

@@ -1,20 +1,20 @@
use std::cmp;
use std::fmt;
use std::io::{self, Read, Write};
use std::io::{self, Write};
use std::ptr;
use futures::Async;
use tokio::io::Io;
use http::{Http1Transaction, h1, MessageHead, ParseResult, DebugTruncate};
use http::buf::{MemBuf, MemSlice};
use bytes::{BytesMut, Bytes};
const INIT_BUFFER_SIZE: usize = 4096;
pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
pub struct Buffered<T> {
io: T,
read_buf: MemBuf,
read_buf: BytesMut,
write_buf: WriteBuf,
}
@@ -31,25 +31,25 @@ impl<T: Io> Buffered<T> {
pub fn new(io: T) -> Buffered<T> {
Buffered {
io: io,
read_buf: MemBuf::new(),
read_buf: BytesMut::with_capacity(0),
write_buf: WriteBuf::new(),
}
}
pub fn read_buf(&self) -> &[u8] {
self.read_buf.bytes()
self.read_buf.as_ref()
}
pub fn consume_leading_lines(&mut self) {
if !self.read_buf.is_empty() {
let mut i = 0;
while i < self.read_buf.len() {
match self.read_buf.bytes()[i] {
match self.read_buf[i] {
b'\r' | b'\n' => i += 1,
_ => break,
}
}
self.read_buf.slice(i);
self.read_buf.drain_to(i);
}
}
@@ -59,7 +59,7 @@ impl<T: Io> Buffered<T> {
pub fn parse<S: Http1Transaction>(&mut self) -> ::Result<Option<MessageHead<S::Incoming>>> {
self.reserve_read_buf();
match self.read_buf.read_from(&mut self.io) {
match self.read_from_io() {
Ok(0) => {
trace!("parse eof");
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "parse eof").into());
@@ -70,7 +70,7 @@ impl<T: Io> Buffered<T> {
_ => return Err(e.into())
}
}
match try!(parse::<S, _>(&self.read_buf)) {
match try!(parse::<S, _>(&mut self.read_buf)) {
Some(head) => {
//trace!("parsed {} bytes out of {}", len, self.read_buf.len());
//self.read_buf.slice(len);
@@ -87,8 +87,26 @@ impl<T: Io> Buffered<T> {
}
}
fn read_from_io(&mut self) -> io::Result<usize> {
use bytes::BufMut;
unsafe {
let n = try!(self.io.read(self.read_buf.bytes_mut()));
self.read_buf.advance_mut(n);
Ok(n)
}
}
fn reserve_read_buf(&mut self) {
use bytes::BufMut;
if self.read_buf.remaining_mut() >= INIT_BUFFER_SIZE {
return
}
self.read_buf.reserve(INIT_BUFFER_SIZE);
unsafe {
let buf = self.read_buf.bytes_mut();
let len = buf.len();
ptr::write_bytes(buf.as_mut_ptr(), 0, len);
}
}
pub fn buffer<B: AsRef<[u8]>>(&mut self, buf: B) -> usize {
@@ -101,7 +119,6 @@ impl<T: Io> Buffered<T> {
}
}
impl<T: Write> Write for Buffered<T> {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
Ok(self.write_buf.buffer(data))
@@ -122,25 +139,25 @@ impl<T: Write> Write for Buffered<T> {
}
}
fn parse<T: Http1Transaction<Incoming=I>, I>(rdr: &MemBuf) -> ParseResult<I> {
fn parse<T: Http1Transaction<Incoming=I>, I>(rdr: &mut BytesMut) -> ParseResult<I> {
h1::parse::<T, I>(rdr)
}
pub trait MemRead {
fn read_mem(&mut self, len: usize) -> io::Result<MemSlice>;
fn read_mem(&mut self, len: usize) -> io::Result<Bytes>;
}
impl<T: Read> MemRead for Buffered<T> {
fn read_mem(&mut self, len: usize) -> io::Result<MemSlice> {
impl<T: Io> MemRead for Buffered<T> {
fn read_mem(&mut self, len: usize) -> io::Result<Bytes> {
trace!("Buffered.read_mem read_buf={}, wanted={}", self.read_buf.len(), len);
if !self.read_buf.is_empty() {
let n = ::std::cmp::min(len, self.read_buf.len());
trace!("Buffered.read_mem read_buf is not empty, slicing {}", n);
Ok(self.read_buf.slice(n))
Ok(self.read_buf.drain_to(n).freeze())
} else {
self.read_buf.reset();
let n = try!(self.read_buf.read_from(&mut self.io));
Ok(self.read_buf.slice(::std::cmp::min(len, n)))
self.reserve_read_buf();
let n = try!(self.read_from_io());
Ok(self.read_buf.drain_to(::std::cmp::min(len, n)).freeze())
}
}
}
@@ -288,6 +305,18 @@ impl WriteBuf {
}
}
#[cfg(test)]
use std::io::Read;
#[cfg(test)]
impl<T: Read> MemRead for ::mock::AsyncIo<T> {
fn read_mem(&mut self, len: usize) -> io::Result<Bytes> {
let mut v = vec![0; len];
let n = try!(self.read(v.as_mut_slice()));
Ok(BytesMut::from(&v[..n]).freeze())
}
}
#[test]
fn test_iobuf_write_empty_slice() {
use mock::{AsyncIo, Buf as MockBuf};