use std::cmp; use std::fmt; use std::io::{self, Write}; use std::ptr; use futures::{Async, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use super::{Http1Transaction, MessageHead}; use bytes::{BytesMut, Bytes}; const INIT_BUFFER_SIZE: usize = 8192; pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; pub struct Buffered { flush_pipeline: bool, io: T, read_blocked: bool, read_buf: BytesMut, write_buf: WriteBuf, } impl fmt::Debug for Buffered { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Buffered") .field("read_buf", &self.read_buf) .field("write_buf", &self.write_buf) .finish() } } impl Buffered { pub fn new(io: T) -> Buffered { Buffered { flush_pipeline: false, io: io, read_buf: BytesMut::with_capacity(0), write_buf: WriteBuf::new(), read_blocked: false, } } pub fn set_flush_pipeline(&mut self, enabled: bool) { self.flush_pipeline = enabled; } pub fn read_buf(&self) -> &[u8] { self.read_buf.as_ref() } pub fn write_buf_mut(&mut self) -> &mut Vec { self.write_buf.maybe_reset(); self.write_buf.maybe_reserve(0); &mut self.write_buf.0.bytes } pub fn consume_leading_lines(&mut self) { if !self.read_buf.is_empty() { let mut i = 0; while i < self.read_buf.len() { match self.read_buf[i] { b'\r' | b'\n' => i += 1, _ => break, } } self.read_buf.split_to(i); } } pub fn parse(&mut self) -> Poll, ::Error> { loop { match try!(S::parse(&mut self.read_buf)) { Some(head) => { //trace!("parsed {} bytes out of {}", len, self.read_buf.len()); return Ok(Async::Ready(head.0)) }, None => { if self.read_buf.capacity() >= MAX_BUFFER_SIZE { debug!("MAX_BUFFER_SIZE reached, closing"); return Err(::Error::TooLarge); } }, } match try_ready!(self.read_from_io()) { 0 => { trace!("parse eof"); //TODO: With Rust 1.14, this can be Error::from(ErrorKind) return Err(io::Error::new(io::ErrorKind::UnexpectedEof, ParseEof).into()); } _ => {}, } } } pub fn read_from_io(&mut self) -> Poll { use bytes::BufMut; self.read_blocked = false; //TODO: use io.read_buf(), so we don't have to zero memory //Reason this doesn't use it yet is because benchmarks show the //slightest **decrease** in performance. Switching should be done //when it doesn't cost anything. if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE { self.read_buf.reserve(INIT_BUFFER_SIZE); unsafe { // Zero out unused memory let buf = self.read_buf.bytes_mut(); let len = buf.len(); ptr::write_bytes(buf.as_mut_ptr(), 0, len); } } unsafe { let n = match self.io.read(self.read_buf.bytes_mut()) { Ok(n) => n, Err(e) => { if e.kind() == io::ErrorKind::WouldBlock { self.read_blocked = true; return Ok(Async::NotReady); } return Err(e) } }; self.read_buf.advance_mut(n); Ok(Async::Ready(n)) } } pub fn buffer>(&mut self, buf: B) -> usize { self.write_buf.buffer(buf.as_ref()) } pub fn io_mut(&mut self) -> &mut T { &mut self.io } pub fn is_read_blocked(&self) -> bool { self.read_blocked } } impl Write for Buffered { fn write(&mut self, data: &[u8]) -> io::Result { let n = self.write_buf.buffer(data); if n == 0 { Err(io::ErrorKind::WouldBlock.into()) } else { Ok(n) } } fn flush(&mut self) -> io::Result<()> { if self.flush_pipeline && self.read_buf.is_empty() { Ok(()) } else if self.write_buf.remaining() == 0 { self.io.flush() } else { loop { let n = try!(self.write_buf.write_into(&mut self.io)); trace!("flushed {} bytes", n); if self.write_buf.remaining() == 0 { break; } } self.io.flush() } } } pub trait MemRead { fn read_mem(&mut self, len: usize) -> Poll; } impl MemRead for Buffered { fn read_mem(&mut self, len: usize) -> Poll { trace!("Buffered.read_mem read_buf={}, wanted={}", self.read_buf.len(), len); if !self.read_buf.is_empty() { let n = ::std::cmp::min(len, self.read_buf.len()); trace!("Buffered.read_mem read_buf is not empty, slicing {}", n); Ok(Async::Ready(self.read_buf.split_to(n).freeze())) } else { let n = try_ready!(self.read_from_io()); Ok(Async::Ready(self.read_buf.split_to(::std::cmp::min(len, n)).freeze())) } } } #[derive(Clone)] pub struct Cursor { bytes: T, pos: usize, } impl> Cursor { pub fn new(bytes: T) -> Cursor { Cursor { bytes: bytes, pos: 0, } } pub fn has_started(&self) -> bool { self.pos != 0 } pub fn is_written(&self) -> bool { trace!("Cursor::is_written pos = {}, len = {}", self.pos, self.bytes.as_ref().len()); self.pos >= self.bytes.as_ref().len() } pub fn write_to(&mut self, dst: &mut W) -> io::Result { if self.remaining() == 0 { Ok(0) } else { dst.write(&self.bytes.as_ref()[self.pos..]).map(|n| { self.pos += n; n }) } } fn remaining(&self) -> usize { self.bytes.as_ref().len() - self.pos } #[inline] pub fn buf(&self) -> &[u8] { &self.bytes.as_ref()[self.pos..] } #[inline] pub fn consume(&mut self, num: usize) { trace!("Cursor::consume({})", num); self.pos = ::std::cmp::min(self.bytes.as_ref().len(), self.pos + num); } } impl> fmt::Debug for Cursor { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Cursor") .field("pos", &self.pos) .field("len", &self.bytes.as_ref().len()) .finish() } } pub trait AtomicWrite { fn write_atomic(&mut self, data: &[&[u8]]) -> io::Result; } /* #[cfg(not(windows))] impl AtomicWrite for T { fn write_atomic(&mut self, bufs: &[&[u8]]) -> io::Result { self.writev(bufs) } } #[cfg(windows)] */ impl AtomicWrite for T { fn write_atomic(&mut self, bufs: &[&[u8]]) -> io::Result { if bufs.len() == 1 { self.write(bufs[0]) } else { let vec = bufs.concat(); self.write(&vec) } } } //} // an internal buffer to collect writes before flushes #[derive(Debug)] struct WriteBuf(Cursor>); impl WriteBuf { fn new() -> WriteBuf { WriteBuf(Cursor::new(Vec::new())) } fn write_into(&mut self, w: &mut W) -> io::Result { self.0.write_to(w) } fn buffer(&mut self, data: &[u8]) -> usize { trace!("WriteBuf::buffer() len = {:?}", data.len()); self.maybe_reset(); self.maybe_reserve(data.len()); let vec = &mut self.0.bytes; let len = cmp::min(vec.capacity() - vec.len(), data.len()); assert!(vec.capacity() - vec.len() >= len); unsafe { // in rust 1.9, we could use slice::copy_from_slice ptr::copy( data.as_ptr(), vec.as_mut_ptr().offset(vec.len() as isize), len ); let new_len = vec.len() + len; vec.set_len(new_len); } len } fn remaining(&self) -> usize { self.0.remaining() } #[inline] fn maybe_reserve(&mut self, needed: usize) { let vec = &mut self.0.bytes; let cap = vec.capacity(); if cap == 0 { let init = cmp::min(MAX_BUFFER_SIZE, cmp::max(INIT_BUFFER_SIZE, needed)); trace!("WriteBuf reserving initial {}", init); vec.reserve(init); } else if cap < MAX_BUFFER_SIZE { vec.reserve(cmp::min(needed, MAX_BUFFER_SIZE - cap)); trace!("WriteBuf reserved {}", vec.capacity() - cap); } } fn maybe_reset(&mut self) { if self.0.pos != 0 && self.0.remaining() == 0 { self.0.pos = 0; unsafe { self.0.bytes.set_len(0); } } } } #[derive(Debug)] struct ParseEof; impl fmt::Display for ParseEof { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("parse eof") } } impl ::std::error::Error for ParseEof { fn description(&self) -> &str { "parse eof" } } // TODO: Move tests to their own mod #[cfg(test)] use std::io::Read; #[cfg(test)] impl MemRead for ::mock::AsyncIo { fn read_mem(&mut self, len: usize) -> Poll { let mut v = vec![0; len]; let n = try_nb!(self.read(v.as_mut_slice())); Ok(Async::Ready(BytesMut::from(&v[..n]).freeze())) } } #[test] fn test_iobuf_write_empty_slice() { use mock::{AsyncIo, Buf as MockBuf}; let mut mock = AsyncIo::new(MockBuf::new(), 256); mock.error(io::Error::new(io::ErrorKind::Other, "logic error")); let mut io_buf = Buffered::new(mock); // underlying io will return the logic error upon write, // so we are testing that the io_buf does not trigger a write // when there is nothing to flush io_buf.flush().expect("should short-circuit flush"); } #[test] fn test_parse_reads_until_blocked() { use mock::{AsyncIo, Buf as MockBuf}; // missing last line ending let raw = "HTTP/1.1 200 OK\r\n"; let mock = AsyncIo::new(MockBuf::wrap(raw.into()), raw.len()); let mut buffered = Buffered::new(mock); assert_eq!(buffered.parse::().unwrap(), Async::NotReady); assert!(buffered.io.blocked()); }