diff --git a/src/http/buf.rs b/src/http/buf.rs index e458b4c9..646231ae 100644 --- a/src/http/buf.rs +++ b/src/http/buf.rs @@ -85,9 +85,7 @@ impl MemBuf { trace!("MemBuf::reserve unique access, growing"); unsafe { let mut vec = &mut *self.buf.get(); - vec.reserve(needed); - let new_cap = vec.capacity(); - grow_zerofill(vec, new_cap - orig_cap); + grow_zerofill(vec, needed); } } else { // we need to allocate more space, but dont have unique @@ -139,9 +137,32 @@ impl MemBuf { #[inline] unsafe fn grow_zerofill(buf: &mut Vec, additional: usize) { - let len = buf.len(); - buf.set_len(len + additional); - ::std::ptr::write_bytes(buf.as_mut_ptr().offset(len as isize), 0, additional); + let orig_cap = buf.capacity(); + buf.reserve(additional); + let new_cap = buf.capacity(); + let reserved = new_cap - orig_cap; + let orig_len = buf.len(); + zero(buf, orig_len, reserved); + buf.set_len(orig_len + reserved); + + + unsafe fn zero(buf: &mut Vec, offset: usize, len: usize) { + assert!(buf.capacity() >= len + offset, + "offset of {} with len of {} is bigger than capacity of {}", + offset, len, buf.capacity()); + ptr::write_bytes(buf.as_mut_ptr().offset(offset as isize), 0, len); + } +} + +#[test] +fn test_grow_zerofill() { + for init in 0..100 { + for reserve in (0..100).rev() { + let mut vec = vec![0; init]; + unsafe { grow_zerofill(&mut vec, reserve) } + assert_eq!(vec.len(), vec.capacity()); + } + } } impl fmt::Debug for MemBuf { diff --git a/src/http/buffer.rs b/src/http/buffer.rs deleted file mode 100644 index 4d51709b..00000000 --- a/src/http/buffer.rs +++ /dev/null @@ -1,107 +0,0 @@ -use std::cmp; -use std::io::{self, Write}; -use std::ptr; - - -const INIT_BUFFER_SIZE: usize = 4096; -pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; - -#[derive(Debug, Default)] -pub struct Buffer { - vec: Vec, - tail: usize, - head: usize, -} - -impl Buffer { - pub fn new() -> Buffer { - Buffer::default() - } - - #[inline] - pub fn len(&self) -> usize { - self.tail - self.head - } - - #[inline] - fn available(&self) -> usize { - self.vec.len() - self.tail - } - - #[inline] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn write_into(&mut self, w: &mut W) -> io::Result { - if self.is_empty() { - Ok(0) - } else { - let n = try!(w.write(&mut self.vec[self.head..self.tail])); - self.head += n; - self.maybe_reset(); - Ok(n) - } - } - - pub fn write(&mut self, data: &[u8]) -> usize { - trace!("Buffer::write len = {:?}", data.len()); - self.maybe_reserve(data.len()); - let len = cmp::min(self.available(), data.len()); - assert!(self.available() >= len); - unsafe { - // in rust 1.9, we could use slice::copy_from_slice - ptr::copy( - data.as_ptr(), - self.vec.as_mut_ptr().offset(self.tail as isize), - len - ); - } - self.tail += len; - len - } - - #[inline] - fn maybe_reserve(&mut self, needed: usize) { - let cap = self.vec.len(); - if cap == 0 { - // first reserve - let init = cmp::max(INIT_BUFFER_SIZE, needed); - trace!("reserving initial {}", init); - self.vec = vec![0; init]; - } else if self.head > 0 && self.tail == cap && self.head >= needed { - // there is space to shift over - let count = self.tail - self.head; - trace!("moving buffer bytes over by {}", count); - unsafe { - ptr::copy( - self.vec.as_ptr().offset(self.head as isize), - self.vec.as_mut_ptr(), - count - ); - } - self.tail -= count; - self.head = 0; - } else if self.tail == cap && cap < MAX_BUFFER_SIZE { - self.vec.reserve(cmp::min(cap * 4, MAX_BUFFER_SIZE) - cap); - let new = self.vec.capacity() - cap; - trace!("reserved {}", new); - unsafe { grow_zerofill(&mut self.vec, new) } - } - } - - #[inline] - fn maybe_reset(&mut self) { - if self.tail != 0 && self.tail == self.head { - self.tail = 0; - self.head = 0; - } - } -} - -#[inline] -unsafe fn grow_zerofill(buf: &mut Vec, additional: usize) { - let len = buf.len(); - buf.set_len(len + additional); - ptr::write_bytes(buf.as_mut_ptr().offset(len as isize), 0, additional); -} diff --git a/src/http/conn.rs b/src/http/conn.rs index 3f4f0fe2..832f0544 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -610,12 +610,14 @@ mod tests { #[test] fn test_conn_body_write_length() { + extern crate pretty_env_logger; use ::futures::Future; + let _ = pretty_env_logger::init(); let _: Result<(), ()> = ::futures::lazy(|| { let io = AsyncIo::new_buf(vec![], 0); let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default()); - let max = ::http::buffer::MAX_BUFFER_SIZE + 4096; - conn.state.writing = Writing::Body(Encoder::length(max as u64), None); + let max = ::http::io::MAX_BUFFER_SIZE + 4096; + conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64), None); assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 4].into()) }).unwrap().is_ready()); match conn.state.writing { @@ -623,7 +625,7 @@ mod tests { _ => panic!("writing did not queue chunk: {:?}", conn.state.writing), } - assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; max - 8192].into()) }).unwrap().is_ready()); + assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; max].into()) }).unwrap().is_ready()); match conn.state.writing { Writing::Body(_, Some(_)) => {}, @@ -636,7 +638,8 @@ mod tests { assert!(conn.poll_complete().unwrap().is_not_ready()); conn.io.io_mut().block_in(1024 * 3); assert!(conn.poll_complete().unwrap().is_not_ready()); - conn.io.io_mut().block_in(max); + conn.io.io_mut().block_in(max * 2); + assert!(conn.poll_complete().unwrap().is_not_ready()); assert!(conn.poll_complete().unwrap().is_ready()); assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 4].into()) }).unwrap().is_ready()); diff --git a/src/http/io.rs b/src/http/io.rs index 7f9024a5..53ec497d 100644 --- a/src/http/io.rs +++ b/src/http/io.rs @@ -1,12 +1,13 @@ +use std::cmp; use std::fmt; use std::io::{self, Read, Write}; +use std::ptr; use futures::Async; use tokio::io::Io; use http::{Http1Transaction, h1, MessageHead, ParseResult}; use http::buf::{MemBuf, MemSlice}; -use http::buffer::Buffer; const INIT_BUFFER_SIZE: usize = 4096; pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; @@ -14,7 +15,7 @@ pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; pub struct Buffered { io: T, read_buf: MemBuf, - write_buf: Buffer, + write_buf: WriteBuf, } impl fmt::Debug for Buffered { @@ -31,7 +32,7 @@ impl Buffered { Buffered { io: io, read_buf: MemBuf::new(), - write_buf: Buffer::new(), + write_buf: WriteBuf::new(), } } @@ -91,7 +92,7 @@ impl Buffered { } pub fn buffer>(&mut self, buf: B) { - self.write_buf.write(buf.as_ref()); + self.write_buf.buffer(buf.as_ref()); } #[cfg(test)] @@ -121,12 +122,12 @@ impl Read for Buffered { impl Write for Buffered { fn write(&mut self, data: &[u8]) -> io::Result { - Ok(self.write_buf.write(data)) + Ok(self.write_buf.buffer(data)) } fn flush(&mut self) -> io::Result<()> { self.write_buf.write_into(&mut self.io).and_then(|_n| { - if self.write_buf.is_empty() { + if self.write_buf.remaining() == 0 { Ok(()) } else { Err(io::Error::new(io::ErrorKind::WouldBlock, "wouldblock")) @@ -177,14 +178,20 @@ impl> Cursor { self.pos >= self.bytes.as_ref().len() } - /* pub fn write_to(&mut self, dst: &mut W) -> io::Result { - dst.write(&self.bytes.as_ref()[self.pos..]).map(|n| { - self.pos += n; - n - }) + if self.remaining() == 0 { + Ok(0) + } else { + dst.write(&self.bytes.as_ref()[self.pos..]).map(|n| { + self.pos += n; + n + }) + } + } + + fn remaining(&self) -> usize { + self.bytes.as_ref().len() - self.pos } - */ #[inline] pub fn buf(&self) -> &[u8] { @@ -201,8 +208,15 @@ impl> Cursor { impl> fmt::Debug for Cursor { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let bytes = self.buf(); - let reasonable_max = ::std::cmp::min(bytes.len(), 32); - write!(f, "Cursor({:?})", &bytes[..reasonable_max]) + if bytes.len() > 32 { + try!(f.write_str("Cursor([")); + for byte in &bytes[..32] { + try!(write!(f, "{:?}, ", byte)); + } + write!(f, "... {}])", bytes.len()) + } else { + write!(f, "Cursor({:?})", &bytes) + } } } @@ -230,6 +244,66 @@ impl AtomicWrite for T { } //} +// an internal buffer to collect writes before flushes +#[derive(Debug)] +struct WriteBuf(Cursor>); + +impl WriteBuf { + fn new() -> WriteBuf { + WriteBuf(Cursor::new(Vec::new())) + } + + fn write_into(&mut self, w: &mut W) -> io::Result { + self.0.write_to(w) + } + + fn buffer(&mut self, data: &[u8]) -> usize { + trace!("WriteBuf::buffer() len = {:?}", data.len()); + self.maybe_reset(); + self.maybe_reserve(data.len()); + let mut vec = &mut self.0.bytes; + let len = cmp::min(vec.capacity() - vec.len(), data.len()); + assert!(vec.capacity() - vec.len() >= len); + unsafe { + // in rust 1.9, we could use slice::copy_from_slice + ptr::copy( + data.as_ptr(), + vec.as_mut_ptr().offset(vec.len() as isize), + len + ); + let new_len = vec.len() + len; + vec.set_len(new_len); + } + len + } + + fn remaining(&self) -> usize { + self.0.remaining() + } + + #[inline] + fn maybe_reserve(&mut self, needed: usize) { + let mut vec = &mut self.0.bytes; + let cap = vec.capacity(); + if cap == 0 { + let init = cmp::max(INIT_BUFFER_SIZE, needed); + trace!("WriteBuf reserving initial {}", init); + vec.reserve(init); + } else if cap < MAX_BUFFER_SIZE { + vec.reserve(cmp::min(needed, MAX_BUFFER_SIZE - cap)); + trace!("WriteBuf reserved {}", vec.capacity() - cap); + } + } + + fn maybe_reset(&mut self) { + if self.0.pos != 0 && self.0.remaining() == 0 { + self.0.pos = 0; + unsafe { + self.0.bytes.set_len(0); + } + } + } +} #[test] fn test_iobuf_write_empty_slice() { diff --git a/src/http/mod.rs b/src/http/mod.rs index abe73c98..4073dd7e 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -16,7 +16,6 @@ pub use self::chunk::Chunk; mod body; mod buf; -mod buffer; mod chunk; mod conn; mod io; diff --git a/src/mock.rs b/src/mock.rs index e13c35e6..db969032 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -105,6 +105,7 @@ impl Write for AsyncIo { } else if self.bytes_until_block == 0 { Err(io::Error::new(io::ErrorKind::WouldBlock, "mock block")) } else { + trace!("AsyncIo::write() block_in = {}, data.len() = {}", self.bytes_until_block, data.len()); let n = cmp::min(self.bytes_until_block, data.len()); let n = try!(self.inner.write(&data[..n])); self.bytes_until_block -= n;