perf(http): utilize writev when possible

By using `AsyncWrite::write_buf`, we can avoid some copies in some
cases. This especially helps throughput for chunked encoding.
This commit is contained in:
Sean McArthur
2018-01-25 12:45:55 -08:00
parent 11b49c2cc8
commit 68377ede70
10 changed files with 679 additions and 500 deletions

View File

@@ -25,6 +25,7 @@ futures = "0.1.17"
futures-cpupool = "0.1.6"
http = { version = "0.1", optional = true }
httparse = "1.0"
iovec = "0.1"
language-tags = "0.2"
log = "0.4"
mime = "0.3.2"

View File

@@ -3,6 +3,7 @@
extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;
extern crate test;
use std::io::{Read, Write};
@@ -17,6 +18,7 @@ use hyper::server::{self, Service};
macro_rules! bench_server {
($b:ident, $header:expr, $body:expr) => ({
let _ = pretty_env_logger::try_init();
let (_until_tx, until_rx) = oneshot::channel();
let addr = {
let (addr_tx, addr_rx) = mpsc::channel();
@@ -53,7 +55,7 @@ macro_rules! bench_server {
sum += tcp.read(&mut buf).unwrap();
}
assert_eq!(sum, total_bytes);
})
});
})
}

View File

@@ -24,6 +24,7 @@ extern crate futures_cpupool;
#[cfg(feature = "compat")]
extern crate http;
extern crate httparse;
extern crate iovec;
extern crate language_tags;
#[macro_use] extern crate log;
pub extern crate mime;

View File

@@ -31,6 +31,12 @@ impl ::std::ops::Deref for Buf {
}
}
impl AsRef<[u8]> for Buf {
fn as_ref(&self) -> &[u8] {
&self.vec
}
}
impl<S: AsRef<[u8]>> PartialEq<S> for Buf {
fn eq(&self, other: &S) -> bool {
self.vec == other.as_ref()
@@ -110,6 +116,13 @@ impl AsyncIo<Buf> {
}
}
impl<S: AsRef<[u8]>, T: AsRef<[u8]>> PartialEq<S> for AsyncIo<T> {
fn eq(&self, other: &S) -> bool {
self.inner.as_ref() == other.as_ref()
}
}
impl<T: Read> Read for AsyncIo<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.blocked = false;
@@ -156,6 +169,46 @@ impl<T: Read + Write> AsyncWrite for AsyncIo<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
fn write_buf<B: ::bytes::Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
use futures::Async;
let r = {
static DUMMY: &[u8] = &[0];
let mut bufs = [From::from(DUMMY); 64];
let i = ::bytes::Buf::bytes_vec(&buf, &mut bufs);
let mut n = 0;
let mut ret = Ok(0);
for iovec in &bufs[..i] {
match self.write(iovec) {
Ok(num) => {
n += num;
ret = Ok(n);
},
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
if let Ok(0) = ret {
ret = Err(e);
}
} else {
ret = Err(e);
}
break;
}
}
}
ret
};
match r {
Ok(n) => {
::bytes::Buf::advance(buf, n);
Ok(Async::Ready(n))
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
Ok(Async::NotReady)
}
Err(e) => Err(e),
}
}
}
impl ::std::ops::Deref for AsyncIo<Buf> {

View File

@@ -1,5 +1,5 @@
use std::fmt;
use std::io::{self, Write};
use std::io::{self};
use std::marker::PhantomData;
use futures::{Async, AsyncSink, Poll, StartSend};
@@ -12,7 +12,7 @@ use tokio_proto::streaming::pipeline::{Frame, Transport};
use proto::Http1Transaction;
use super::io::{Cursor, Buffered};
use super::h1::{Encoder, Decoder};
use super::h1::{EncodedBuf, Encoder, Decoder};
use method::Method;
use version::HttpVersion;
@@ -25,8 +25,8 @@ use version::HttpVersion;
/// determine if this connection can be kept alive after the message,
/// or if it is complete.
pub struct Conn<I, B, T, K = KA> {
io: Buffered<I>,
state: State<B, K>,
io: Buffered<I, EncodedBuf<Cursor<B>>>,
state: State<K>,
_marker: PhantomData<T>
}
@@ -103,8 +103,6 @@ where I: AsyncRead + AsyncWrite,
error: err,
}))),
};
} else if self.can_write_continue() {
try_nb!(self.flush());
} else if self.can_read_body() {
return self.read_body()
.map(|async| async.map(|chunk| Some(Frame::Body {
@@ -153,13 +151,6 @@ where I: AsyncRead + AsyncWrite,
}
}
pub fn can_write_continue(&self) -> bool {
match self.state.writing {
Writing::Continue(..) => true,
_ => false,
}
}
pub fn can_read_body(&self) -> bool {
match self.state.reading {
Reading::Body(..) => true,
@@ -228,7 +219,7 @@ where I: AsyncRead + AsyncWrite,
self.state.busy();
if head.expecting_continue() {
let msg = b"HTTP/1.1 100 Continue\r\n\r\n";
self.state.writing = Writing::Continue(Cursor::new(msg));
self.io.write_buf_mut().extend_from_slice(msg);
}
let wants_keep_alive = head.should_keep_alive();
self.state.keep_alive &= wants_keep_alive;
@@ -370,9 +361,7 @@ where I: AsyncRead + AsyncWrite,
};
match self.state.writing {
Writing::Continue(..) |
Writing::Body(..) |
Writing::Ending(..) => return,
Writing::Body(..) => return,
Writing::Init |
Writing::KeepAlive |
Writing::Closed => (),
@@ -408,7 +397,7 @@ where I: AsyncRead + AsyncWrite,
pub fn can_write_head(&self) -> bool {
match self.state.writing {
Writing::Continue(..) | Writing::Init => true,
Writing::Init => true,
_ => false
}
}
@@ -416,19 +405,14 @@ where I: AsyncRead + AsyncWrite,
pub fn can_write_body(&self) -> bool {
match self.state.writing {
Writing::Body(..) => true,
Writing::Continue(..) |
Writing::Init |
Writing::Ending(..) |
Writing::KeepAlive |
Writing::Closed => false,
}
}
pub fn has_queued_body(&self) -> bool {
match self.state.writing {
Writing::Body(_, Some(_)) => true,
_ => false,
}
pub fn can_buffer_body(&self) -> bool {
self.io.can_buffer()
}
pub fn write_head(&mut self, mut head: super::MessageHead<T::Outgoing>, body: bool) {
@@ -437,17 +421,10 @@ where I: AsyncRead + AsyncWrite,
self.enforce_version(&mut head);
let buf = self.io.write_buf_mut();
// if a 100-continue has started but not finished sending, tack the
// remainder on to the start of the buffer.
if let Writing::Continue(ref pending) = self.state.writing {
if pending.has_started() {
buf.extend_from_slice(pending.buf());
}
}
self.state.writing = match T::encode(head, body, &mut self.state.method, buf) {
Ok(encoder) => {
if !encoder.is_eof() {
Writing::Body(encoder, None)
Writing::Body(encoder)
} else {
Writing::KeepAlive
}
@@ -492,46 +469,26 @@ where I: AsyncRead + AsyncWrite,
pub fn write_body(&mut self, chunk: Option<B>) -> StartSend<Option<B>, io::Error> {
debug_assert!(self.can_write_body());
if self.has_queued_body() {
try!(self.flush());
if !self.can_write_body() {
if chunk.as_ref().map(|c| c.as_ref().len()).unwrap_or(0) == 0 {
return Ok(AsyncSink::NotReady(chunk));
} else {
if !self.can_buffer_body() {
if let Async::NotReady = self.flush()? {
// if chunk is Some(&[]), aka empty, whatever, just skip it
if chunk.as_ref().map(|c| c.as_ref().is_empty()).unwrap_or(false) {
return Ok(AsyncSink::Ready);
} else {
return Ok(AsyncSink::NotReady(chunk));
}
}
}
let state = match self.state.writing {
Writing::Body(ref mut encoder, ref mut queued) => {
if queued.is_some() {
return Ok(AsyncSink::NotReady(chunk));
}
Writing::Body(ref mut encoder) => {
if let Some(chunk) = chunk {
if chunk.as_ref().is_empty() {
return Ok(AsyncSink::Ready);
}
let mut cursor = Cursor::new(chunk);
match encoder.encode(&mut self.io, cursor.buf()) {
Ok(n) => {
cursor.consume(n);
if !cursor.is_written() {
trace!("Conn::start_send frame not written, queued");
*queued = Some(cursor);
}
},
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => {
trace!("Conn::start_send frame not written, queued");
*queued = Some(cursor);
},
_ => return Err(e)
}
}
let encoded = encoder.encode(Cursor::new(chunk));
self.io.buffer(encoded);
if encoder.is_eof() {
Writing::KeepAlive
@@ -541,8 +498,12 @@ where I: AsyncRead + AsyncWrite,
} else {
// end of stream, that means we should try to eof
match encoder.end() {
Ok(Some(end)) => Writing::Ending(Cursor::new(end)),
Ok(None) => Writing::KeepAlive,
Ok(end) => {
if let Some(end) = end {
self.io.buffer(end);
}
Writing::KeepAlive
},
Err(_not_eof) => Writing::Closed,
}
}
@@ -575,61 +536,11 @@ where I: AsyncRead + AsyncWrite,
Err(err)
}
fn write_queued(&mut self) -> Poll<(), io::Error> {
trace!("Conn::write_queued()");
let state = match self.state.writing {
Writing::Continue(ref mut queued) => {
let n = self.io.buffer(queued.buf());
queued.consume(n);
if queued.is_written() {
Writing::Init
} else {
return Ok(Async::NotReady);
}
}
Writing::Body(ref mut encoder, ref mut queued) => {
let complete = if let Some(chunk) = queued.as_mut() {
let n = try_nb!(encoder.encode(&mut self.io, chunk.buf()));
chunk.consume(n);
chunk.is_written()
} else {
true
};
trace!("Conn::write_queued complete = {}", complete);
return if complete {
*queued = None;
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
};
},
Writing::Ending(ref mut ending) => {
let n = self.io.buffer(ending.buf());
ending.consume(n);
if ending.is_written() {
Writing::KeepAlive
} else {
return Ok(Async::NotReady);
}
},
_ => return Ok(Async::Ready(())),
};
self.state.writing = state;
Ok(Async::Ready(()))
}
pub fn flush(&mut self) -> Poll<(), io::Error> {
loop {
let queue_finished = try!(self.write_queued()).is_ready();
try_nb!(self.io.flush());
if queue_finished {
break;
}
}
try_ready!(self.io.flush());
self.try_keep_alive();
trace!("flushed {:?}", self.state);
Ok(Async::Ready(()))
}
pub fn shutdown(&mut self) -> Poll<(), io::Error> {
@@ -740,7 +651,7 @@ where I: AsyncRead + AsyncWrite,
},
};
error!("writing illegal frame; state={:?}, frame={:?}", self.state.writing, DebugFrame(&frame));
warn!("writing illegal frame; state={:?}, frame={:?}", self.state.writing, DebugFrame(&frame));
Err(io::Error::new(io::ErrorKind::InvalidInput, "illegal frame"))
}
@@ -778,13 +689,13 @@ impl<I, B: AsRef<[u8]>, T, K: KeepAlive> fmt::Debug for Conn<I, B, T, K> {
}
}
struct State<B, K> {
struct State<K> {
error: Option<::Error>,
keep_alive: K,
method: Option<Method>,
read_task: Option<Task>,
reading: Reading,
writing: Writing<B>,
writing: Writing,
version: Version,
}
@@ -796,16 +707,14 @@ enum Reading {
Closed,
}
enum Writing<B> {
Continue(Cursor<&'static [u8]>),
enum Writing {
Init,
Body(Encoder, Option<Cursor<B>>),
Ending(Cursor<&'static [u8]>),
Body(Encoder),
KeepAlive,
Closed,
}
impl<B: AsRef<[u8]>, K: KeepAlive> fmt::Debug for State<B, K> {
impl<K: KeepAlive> fmt::Debug for State<K> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("State")
.field("reading", &self.reading)
@@ -818,19 +727,12 @@ impl<B: AsRef<[u8]>, K: KeepAlive> fmt::Debug for State<B, K> {
}
}
impl<B: AsRef<[u8]>> fmt::Debug for Writing<B> {
impl fmt::Debug for Writing {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Writing::Continue(ref buf) => f.debug_tuple("Continue")
.field(buf)
.finish(),
Writing::Init => f.write_str("Init"),
Writing::Body(ref enc, ref queued) => f.debug_tuple("Body")
Writing::Body(ref enc) => f.debug_tuple("Body")
.field(enc)
.field(queued)
.finish(),
Writing::Ending(ref ending) => f.debug_tuple("Ending")
.field(ending)
.finish(),
Writing::KeepAlive => f.write_str("KeepAlive"),
Writing::Closed => f.write_str("Closed"),
@@ -884,7 +786,7 @@ impl KeepAlive for KA {
}
}
impl<B, K: KeepAlive> State<B, K> {
impl<K: KeepAlive> State<K> {
fn close(&mut self) {
trace!("State::close()");
self.reading = Reading::Closed;
@@ -1031,15 +933,6 @@ mod tests {
use std::str::FromStr;
impl<T> Writing<T> {
fn is_queued(&self) -> bool {
match *self {
Writing::Body(_, Some(_)) => true,
_ => false,
}
}
}
#[test]
fn test_conn_init_read() {
let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
@@ -1226,13 +1119,10 @@ mod tests {
let io = AsyncIo::new_buf(vec![], 0);
let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default());
let max = ::proto::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64), None);
conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 8].into()) }).unwrap().is_ready());
assert!(!conn.state.writing.is_queued());
assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; max].into()) }).unwrap().is_ready());
assert!(conn.state.writing.is_queued());
assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
assert!(!conn.can_buffer_body());
assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
@@ -1253,7 +1143,7 @@ mod tests {
let _: Result<(), ()> = future::lazy(|| {
let io = AsyncIo::new_buf(vec![], 4096);
let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.writing = Writing::Body(Encoder::chunked(), None);
conn.state.writing = Writing::Body(Encoder::chunked());
assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
@@ -1266,11 +1156,12 @@ mod tests {
let _: Result<(), ()> = future::lazy(|| {
let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.writing = Writing::Body(Encoder::length(1024 * 1024), None);
conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
assert!(conn.state.writing.is_queued());
assert!(!conn.can_buffer_body());
conn.io.io_mut().block_in(1024 * 1024 * 5);
assert!(conn.poll_complete().unwrap().is_ready());
assert!(!conn.state.writing.is_queued());
assert!(conn.can_buffer_body());
assert!(conn.io.io_mut().flushed());
Ok(())
@@ -1329,7 +1220,7 @@ mod tests {
let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.reading = Reading::KeepAlive;
assert!(conn.poll().unwrap().is_not_ready());
conn.state.writing = Writing::Body(Encoder::length(5_000), None);
conn.state.writing = Writing::Body(Encoder::length(5_000));
assert!(conn.poll_complete().unwrap().is_ready());
Ok::<(), ()>(())
});

View File

@@ -87,8 +87,6 @@ where
return Ok(Async::Ready(()));
} else if self.conn.can_read_head() {
try_ready!(self.poll_read_head());
} else if self.conn.can_write_continue() {
try_nb!(self.conn.flush());
} else if let Some(mut body) = self.body_tx.take() {
if self.conn.can_read_body() {
match body.poll_ready() {
@@ -196,7 +194,7 @@ where
self.close();
return Ok(Async::Ready(()));
}
} else if self.conn.has_queued_body() {
} else if !self.conn.can_buffer_body() {
try_ready!(self.poll_flush());
} else if let Some(mut body) = self.body_rx.take() {
let chunk = match body.poll()? {

View File

@@ -1,7 +1,9 @@
use std::cmp;
use std::io::{self, Write};
//use std::cmp;
use std::fmt;
use proto::io::AtomicWrite;
use bytes::{Buf, IntoBuf};
use bytes::buf::{Chain, Take};
use iovec::IoVec;
/// Encoders to handle different Transfer-Encodings.
#[derive(Debug, Clone)]
@@ -9,10 +11,18 @@ pub struct Encoder {
kind: Kind,
}
#[derive(Debug)]
pub struct EncodedBuf<B> {
kind: BufKind<B>,
}
#[derive(Debug)]
pub struct NotEof;
#[derive(Debug, PartialEq, Clone)]
enum Kind {
/// An Encoder for when Transfer-Encoding includes `chunked`.
Chunked(Chunked),
Chunked,
/// An Encoder for when Content-Length is set.
///
/// Enforces that the body is not longer than the Content-Length header.
@@ -24,10 +34,18 @@ enum Kind {
Eof
}
#[derive(Debug)]
enum BufKind<B> {
Exact(B),
Limited(Take<B>),
Chunked(Chain<ChunkSize, Chain<B, CrLf>>),
ChunkedEnd(CrLf),
}
impl Encoder {
pub fn chunked() -> Encoder {
Encoder {
kind: Kind::Chunked(Chunked::Init),
kind: Kind::Chunked,
}
}
@@ -45,185 +63,103 @@ impl Encoder {
pub fn is_eof(&self) -> bool {
match self.kind {
Kind::Length(0) |
Kind::Chunked(Chunked::End) => true,
Kind::Length(0) => true,
_ => false
}
}
pub fn end(&self) -> Result<Option<&'static [u8]>, NotEof> {
pub fn end<B>(&self) -> Result<Option<EncodedBuf<B>>, NotEof> {
match self.kind {
Kind::Length(0) => Ok(None),
Kind::Chunked(Chunked::Init) => Ok(Some(b"0\r\n\r\n")),
Kind::Chunked => Ok(Some(EncodedBuf {
kind: BufKind::ChunkedEnd(CrLf(b"0\r\n\r\n")),
})),
_ => Err(NotEof),
}
}
pub fn encode<W: AtomicWrite>(&mut self, w: &mut W, msg: &[u8]) -> io::Result<usize> {
match self.kind {
Kind::Chunked(ref mut chunked) => {
chunked.encode(w, msg)
pub fn encode<B>(&mut self, msg: B) -> EncodedBuf<B::Buf>
where
B: IntoBuf,
{
let msg = msg.into_buf();
let len = msg.remaining();
assert!(len > 0, "encode() called with empty buf");
let buf = match self.kind {
Kind::Chunked => {
trace!("encoding chunked {}B", len);
BufKind::Chunked(ChunkSize::new(len)
.chain(msg.chain(CrLf(b"\r\n"))))
},
Kind::Length(ref mut remaining) => {
if msg.is_empty() {
return Ok(0);
trace!("sized write, len = {}", len);
if len as u64 > *remaining {
let limit = *remaining as usize;
*remaining = 0;
BufKind::Limited(msg.take(limit))
} else {
*remaining -= len as u64;
BufKind::Exact(msg)
}
let n = {
let max = cmp::min(*remaining as usize, msg.len());
trace!("sized write = {}", max);
let slice = &msg[..max];
try!(w.write_atomic(&[slice]))
};
if n == 0 {
return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
}
*remaining -= n as u64;
trace!("encoded {} bytes, remaining = {}", n, remaining);
Ok(n)
},
Kind::Eof => {
if msg.is_empty() {
return Ok(0);
}
w.write_atomic(&[msg])
trace!("eof write {}B", len);
BufKind::Exact(msg)
}
}
}
}
#[derive(Debug)]
pub struct NotEof;
#[derive(Debug, PartialEq, Clone)]
enum Chunked {
Init,
Size(ChunkSize),
SizeCr,
SizeLf,
Body(usize),
BodyCr,
BodyLf,
End,
}
impl Chunked {
fn encode<W: AtomicWrite>(&mut self, w: &mut W, msg: &[u8]) -> io::Result<usize> {
match *self {
Chunked::Init => {
let mut size = ChunkSize {
bytes: [0; CHUNK_SIZE_MAX_BYTES],
pos: 0,
len: 0,
};
trace!("chunked write, size = {:?}", msg.len());
write!(&mut size, "{:X}", msg.len())
.expect("CHUNK_SIZE_MAX_BYTES should fit any usize");
*self = Chunked::Size(size);
}
Chunked::End => return Ok(0),
_ => {}
}
let mut n = {
let pieces = match *self {
Chunked::Init => unreachable!("Chunked::Init should have become Chunked::Size"),
Chunked::Size(ref size) => [
&size.bytes[size.pos.into() .. size.len.into()],
&b"\r\n"[..],
msg,
&b"\r\n"[..],
],
Chunked::SizeCr => [
&b""[..],
&b"\r\n"[..],
msg,
&b"\r\n"[..],
],
Chunked::SizeLf => [
&b""[..],
&b"\n"[..],
msg,
&b"\r\n"[..],
],
Chunked::Body(pos) => [
&b""[..],
&b""[..],
&msg[pos..],
&b"\r\n"[..],
],
Chunked::BodyCr => [
&b""[..],
&b""[..],
&b""[..],
&b"\r\n"[..],
],
Chunked::BodyLf => [
&b""[..],
&b""[..],
&b""[..],
&b"\n"[..],
],
Chunked::End => unreachable!("Chunked::End shouldn't write more")
};
try!(w.write_atomic(&pieces))
};
while n > 0 {
match *self {
Chunked::Init => unreachable!("Chunked::Init should have become Chunked::Size"),
Chunked::Size(mut size) => {
n = size.update(n);
if size.len == 0 {
*self = Chunked::SizeCr;
} else {
*self = Chunked::Size(size);
}
},
Chunked::SizeCr => {
*self = Chunked::SizeLf;
n -= 1;
}
Chunked::SizeLf => {
*self = Chunked::Body(0);
n -= 1;
}
Chunked::Body(pos) => {
let left = msg.len() - pos;
if n >= left {
*self = Chunked::BodyCr;
n -= left;
} else {
*self = Chunked::Body(pos + n);
n = 0;
}
}
Chunked::BodyCr => {
*self = Chunked::BodyLf;
n -= 1;
}
Chunked::BodyLf => {
assert!(n == 1);
*self = if msg.len() == 0 {
Chunked::End
} else {
Chunked::Init
};
n = 0;
},
Chunked::End => unreachable!("Chunked::End shouldn't have any to write")
}
}
match *self {
Chunked::Init |
Chunked::End => Ok(msg.len()),
_ => Err(io::ErrorKind::WouldBlock.into())
EncodedBuf {
kind: buf,
}
}
}
impl<B> Buf for EncodedBuf<B>
where
B: Buf,
{
#[inline]
fn remaining(&self) -> usize {
match self.kind {
BufKind::Exact(ref b) => b.remaining(),
BufKind::Limited(ref b) => b.remaining(),
BufKind::Chunked(ref b) => b.remaining(),
BufKind::ChunkedEnd(ref b) => b.remaining(),
}
}
#[inline]
fn bytes(&self) -> &[u8] {
match self.kind {
BufKind::Exact(ref b) => b.bytes(),
BufKind::Limited(ref b) => b.bytes(),
BufKind::Chunked(ref b) => b.bytes(),
BufKind::ChunkedEnd(ref b) => b.bytes(),
}
}
#[inline]
fn advance(&mut self, cnt: usize) {
match self.kind {
BufKind::Exact(ref mut b) => b.advance(cnt),
BufKind::Limited(ref mut b) => b.advance(cnt),
BufKind::Chunked(ref mut b) => b.advance(cnt),
BufKind::ChunkedEnd(ref mut b) => b.advance(cnt),
}
}
#[inline]
fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize {
match self.kind {
BufKind::Exact(ref b) => b.bytes_vec(dst),
BufKind::Limited(ref b) => b.bytes_vec(dst),
BufKind::Chunked(ref b) => b.bytes_vec(dst),
BufKind::ChunkedEnd(ref b) => b.bytes_vec(dst),
}
}
}
#[cfg(target_pointer_width = "32")]
const USIZE_BYTES: usize = 4;
@@ -235,27 +171,45 @@ const CHUNK_SIZE_MAX_BYTES: usize = USIZE_BYTES * 2;
#[derive(Clone, Copy)]
struct ChunkSize {
bytes: [u8; CHUNK_SIZE_MAX_BYTES],
bytes: [u8; CHUNK_SIZE_MAX_BYTES + 2],
pos: u8,
len: u8,
}
impl ChunkSize {
fn update(&mut self, n: usize) -> usize {
let diff = (self.len - self.pos).into();
if n >= diff {
self.pos = 0;
self.len = 0;
n - diff
} else {
self.pos += n as u8; // just verified it was a small usize
0
}
fn new(len: usize) -> ChunkSize {
use std::fmt::Write;
let mut size = ChunkSize {
bytes: [0; CHUNK_SIZE_MAX_BYTES + 2],
pos: 0,
len: 0,
};
write!(&mut size, "{:X}\r\n", len)
.expect("CHUNK_SIZE_MAX_BYTES should fit any usize");
size
}
}
impl ::std::fmt::Debug for ChunkSize {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
impl Buf for ChunkSize {
#[inline]
fn remaining(&self) -> usize {
(self.len - self.pos).into()
}
#[inline]
fn bytes(&self) -> &[u8] {
&self.bytes[self.pos.into() .. self.len.into()]
}
#[inline]
fn advance(&mut self, cnt: usize) {
assert!(cnt <= self.remaining());
self.pos += cnt as u8; // just asserted cnt fits in u8
}
}
impl fmt::Debug for ChunkSize {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ChunkSize")
.field("bytes", &&self.bytes[..self.len.into()])
.field("pos", &self.pos)
@@ -263,64 +217,122 @@ impl ::std::fmt::Debug for ChunkSize {
}
}
impl ::std::cmp::PartialEq for ChunkSize {
fn eq(&self, other: &ChunkSize) -> bool {
self.len == other.len &&
self.pos == other.pos &&
(&self.bytes[..]) == (&other.bytes[..])
impl fmt::Write for ChunkSize {
fn write_str(&mut self, num: &str) -> fmt::Result {
use std::io::Write;
(&mut self.bytes[self.len.into()..]).write(num.as_bytes())
.expect("&mut [u8].write() cannot error");
self.len += num.len() as u8; // safe because bytes is never bigger than 256
Ok(())
}
}
impl io::Write for ChunkSize {
fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
let n = (&mut self.bytes[self.len.into() ..]).write(msg)
.expect("&mut [u8].write() cannot error");
self.len += n as u8; // safe because bytes is never bigger than 256
Ok(n)
#[derive(Debug)]
struct CrLf(&'static [u8]);
impl Buf for CrLf {
#[inline]
fn remaining(&self) -> usize {
self.0.len()
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
#[inline]
fn bytes(&self) -> &[u8] {
self.0
}
#[inline]
fn advance(&mut self, cnt: usize) {
self.0 = &self.0[cnt..];
}
#[inline]
fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize {
if dst.is_empty() {
return 0;
} else {
dst[0] = self.0.into();
return 1;
}
}
}
#[cfg(test)]
mod tests {
use bytes::{BufMut};
use proto::io::Cursor;
use super::Encoder;
use mock::{AsyncIo, Buf};
#[test]
fn test_chunked_encode_sync() {
let mut dst = Buf::new();
fn chunked() {
let mut encoder = Encoder::chunked();
let mut dst = Vec::new();
encoder.encode(&mut dst, b"foo bar").unwrap();
encoder.encode(&mut dst, b"baz quux herp").unwrap();
encoder.encode(&mut dst, b"").unwrap();
assert_eq!(&dst[..], &b"7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n0\r\n\r\n"[..]);
let msg1 = b"foo bar".as_ref();
let buf1 = encoder.encode(msg1);
dst.put(buf1);
assert_eq!(dst, b"7\r\nfoo bar\r\n");
let msg2 = b"baz quux herp".as_ref();
let buf2 = encoder.encode(msg2);
dst.put(buf2);
assert_eq!(dst, b"7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n");
let end = encoder.end::<Cursor<Vec<u8>>>().unwrap().unwrap();
dst.put(end);
assert_eq!(dst, b"7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n0\r\n\r\n".as_ref());
}
#[test]
fn test_chunked_encode_async() {
let mut dst = AsyncIo::new(Buf::new(), 7);
let mut encoder = Encoder::chunked();
fn length() {
let max_len = 8;
let mut encoder = Encoder::length(max_len as u64);
let mut dst = Vec::new();
assert!(encoder.encode(&mut dst, b"foo bar").is_err());
dst.block_in(6);
assert_eq!(7, encoder.encode(&mut dst, b"foo bar").unwrap());
dst.block_in(30);
assert_eq!(13, encoder.encode(&mut dst, b"baz quux herp").unwrap());
encoder.encode(&mut dst, b"").unwrap();
assert_eq!(&dst[..], &b"7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n0\r\n\r\n"[..]);
}
#[test]
fn test_sized_encode() {
let mut dst = Buf::new();
let mut encoder = Encoder::length(8);
encoder.encode(&mut dst, b"foo bar").unwrap();
assert_eq!(encoder.encode(&mut dst, b"baz").unwrap(), 1);
let msg1 = b"foo bar".as_ref();
let buf1 = encoder.encode(msg1);
dst.put(buf1);
assert_eq!(dst, b"foo bar");
assert!(!encoder.is_eof());
encoder.end::<()>().unwrap_err();
let msg2 = b"baz".as_ref();
let buf2 = encoder.encode(msg2);
dst.put(buf2);
assert_eq!(dst.len(), max_len);
assert_eq!(dst, b"foo barb");
assert!(encoder.is_eof());
assert!(encoder.end::<()>().unwrap().is_none());
}
#[test]
fn eof() {
let mut encoder = Encoder::eof();
let mut dst = Vec::new();
let msg1 = b"foo bar".as_ref();
let buf1 = encoder.encode(msg1);
dst.put(buf1);
assert_eq!(dst, b"foo bar");
assert!(!encoder.is_eof());
encoder.end::<()>().unwrap_err();
let msg2 = b"baz".as_ref();
let buf2 = encoder.encode(msg2);
dst.put(buf2);
assert_eq!(dst, b"foo barbaz");
assert!(!encoder.is_eof());
encoder.end::<()>().unwrap_err();
}
}

View File

@@ -1,5 +1,5 @@
pub use self::decode::Decoder;
pub use self::encode::Encoder;
pub use self::encode::{EncodedBuf, Encoder};
mod date;
mod decode;

View File

@@ -1,27 +1,31 @@
use std::cmp;
use std::collections::VecDeque;
use std::fmt;
use std::io::{self, Write};
use std::ptr;
use std::io;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{Async, Poll};
use iovec::IoVec;
use tokio_io::{AsyncRead, AsyncWrite};
use super::{Http1Transaction, MessageHead};
use bytes::{BytesMut, Bytes};
const INIT_BUFFER_SIZE: usize = 8192;
pub const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
const MAX_BUF_LIST_BUFFERS: usize = 16;
pub struct Buffered<T> {
pub struct Buffered<T, B> {
flush_pipeline: bool,
io: T,
max_buf_size: usize,
read_blocked: bool,
read_buf: BytesMut,
write_buf: WriteBuf,
write_buf: WriteBuf<B>,
}
impl<T> fmt::Debug for Buffered<T> {
impl<T, B> fmt::Debug for Buffered<T, B>
where
B: Buf,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Buffered")
.field("read_buf", &self.read_buf)
@@ -30,8 +34,12 @@ impl<T> fmt::Debug for Buffered<T> {
}
}
impl<T: AsyncRead + AsyncWrite> Buffered<T> {
pub fn new(io: T) -> Buffered<T> {
impl<T, B> Buffered<T, B>
where
T: AsyncRead + AsyncWrite,
B: Buf,
{
pub fn new(io: T) -> Buffered<T, B> {
Buffered {
flush_pipeline: false,
io: io,
@@ -44,6 +52,11 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
pub fn set_flush_pipeline(&mut self, enabled: bool) {
self.flush_pipeline = enabled;
self.write_buf.set_strategy(if enabled {
Strategy::Flatten
} else {
Strategy::Queue
});
}
pub fn set_max_buf_size(&mut self, max: usize) {
@@ -56,9 +69,17 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
}
pub fn write_buf_mut(&mut self) -> &mut Vec<u8> {
self.write_buf.maybe_reset();
self.write_buf.maybe_reserve(0);
&mut self.write_buf.buf.bytes
let buf = self.write_buf.head_mut();
buf.maybe_reset();
&mut buf.bytes
}
pub fn buffer(&mut self, buf: B) {
self.write_buf.buffer(buf)
}
pub fn can_buffer(&self) -> bool {
self.flush_pipeline || self.write_buf.can_buffer()
}
pub fn consume_leading_lines(&mut self) {
@@ -118,10 +139,6 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
})
}
pub fn buffer<B: AsRef<[u8]>>(&mut self, buf: B) -> usize {
self.write_buf.buffer(buf.as_ref())
}
pub fn io_mut(&mut self) -> &mut T {
&mut self.io
}
@@ -129,33 +146,23 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
pub fn is_read_blocked(&self) -> bool {
self.read_blocked
}
}
impl<T: Write> Write for Buffered<T> {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
let n = self.write_buf.buffer(data);
if n == 0 {
Err(io::ErrorKind::WouldBlock.into())
} else {
Ok(n)
}
}
fn flush(&mut self) -> io::Result<()> {
pub fn flush(&mut self) -> Poll<(), io::Error> {
if self.flush_pipeline && !self.read_buf.is_empty() {
Ok(())
//Ok(())
} else if self.write_buf.remaining() == 0 {
self.io.flush()
try_nb!(self.io.flush());
} else {
loop {
let n = try!(self.write_buf.write_into(&mut self.io));
let n = try_ready!(self.io.write_buf(&mut self.write_buf));
debug!("flushed {} bytes", n);
if self.write_buf.remaining() == 0 {
break;
}
}
self.io.flush()
try_nb!(self.io.flush())
}
Ok(Async::Ready(()))
}
}
@@ -163,7 +170,11 @@ pub trait MemRead {
fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error>;
}
impl<T: AsyncRead + AsyncWrite> MemRead for Buffered<T> {
impl<T, B> MemRead for Buffered<T, B>
where
T: AsyncRead + AsyncWrite,
B: Buf,
{
fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
trace!("Buffered.read_mem read_buf={}, wanted={}", self.read_buf.len(), len);
if !self.read_buf.is_empty() {
@@ -191,30 +202,6 @@ impl<T: AsRef<[u8]>> Cursor<T> {
}
}
pub fn has_started(&self) -> bool {
self.pos != 0
}
pub fn is_written(&self) -> bool {
trace!("Cursor::is_written pos = {}, len = {}", self.pos, self.bytes.as_ref().len());
self.pos >= self.bytes.as_ref().len()
}
pub fn write_to<W: Write>(&mut self, dst: &mut W) -> io::Result<usize> {
if self.remaining() == 0 {
Ok(0)
} else {
dst.write(&self.bytes.as_ref()[self.pos..]).map(|n| {
self.pos += n;
n
})
}
}
fn remaining(&self) -> usize {
self.bytes.as_ref().len() - self.pos
}
#[inline]
pub fn buf(&self) -> &[u8] {
&self.bytes.as_ref()[self.pos..]
@@ -222,8 +209,18 @@ impl<T: AsRef<[u8]>> Cursor<T> {
#[inline]
pub fn consume(&mut self, num: usize) {
trace!("Cursor::consume({})", num);
self.pos = ::std::cmp::min(self.bytes.as_ref().len(), self.pos + num);
self.pos += num;
}
}
impl Cursor<Vec<u8>> {
fn maybe_reset(&mut self) {
if self.pos != 0 && self.remaining() == 0 {
self.pos = 0;
unsafe {
self.bytes.set_len(0);
}
}
}
}
@@ -236,98 +233,239 @@ impl<T: AsRef<[u8]>> fmt::Debug for Cursor<T> {
}
}
pub trait AtomicWrite {
fn write_atomic(&mut self, data: &[&[u8]]) -> io::Result<usize>;
}
/*
#[cfg(not(windows))]
impl<T: Write + ::vecio::Writev> AtomicWrite for T {
fn write_atomic(&mut self, bufs: &[&[u8]]) -> io::Result<usize> {
self.writev(bufs)
impl<T: AsRef<[u8]>> Buf for Cursor<T> {
#[inline]
fn remaining(&self) -> usize {
self.bytes.as_ref().len() - self.pos
}
}
#[inline]
fn bytes(&self) -> &[u8] {
self.buf()
}
#[cfg(windows)]
*/
impl<T: Write> AtomicWrite for T {
fn write_atomic(&mut self, bufs: &[&[u8]]) -> io::Result<usize> {
if bufs.len() == 1 {
self.write(bufs[0])
} else {
let vec = bufs.concat();
self.write(&vec)
}
#[inline]
fn advance(&mut self, cnt: usize) {
self.consume(cnt)
}
}
//}
// an internal buffer to collect writes before flushes
#[derive(Debug)]
struct WriteBuf{
buf: Cursor<Vec<u8>>,
struct WriteBuf<B> {
buf: BufDeque<B>,
max_buf_size: usize,
strategy: Strategy,
}
impl WriteBuf {
fn new() -> WriteBuf {
impl<B> WriteBuf<B> {
fn new() -> WriteBuf<B> {
WriteBuf {
buf: Cursor::new(Vec::new()),
buf: BufDeque::new(),
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
strategy: Strategy::Queue,
}
}
}
impl<B> WriteBuf<B>
where
B: Buf,
{
fn set_strategy(&mut self, strategy: Strategy) {
self.strategy = strategy;
}
fn buffer(&mut self, buf: B) {
match self.strategy {
Strategy::Flatten => {
let head = self.head_mut();
head.maybe_reset();
head.bytes.put(buf);
},
Strategy::Queue => {
self.buf.bufs.push_back(VecOrBuf::Buf(buf));
},
}
}
fn write_into<W: Write>(&mut self, w: &mut W) -> io::Result<usize> {
self.buf.write_to(w)
}
fn buffer(&mut self, data: &[u8]) -> usize {
trace!("WriteBuf::buffer() len = {:?}", data.len());
self.maybe_reset();
self.maybe_reserve(data.len());
let vec = &mut self.buf.bytes;
let len = cmp::min(vec.capacity() - vec.len(), data.len());
assert!(vec.capacity() - vec.len() >= len);
unsafe {
// in rust 1.9, we could use slice::copy_from_slice
ptr::copy(
data.as_ptr(),
vec.as_mut_ptr().offset(vec.len() as isize),
len
);
let new_len = vec.len() + len;
vec.set_len(new_len);
fn can_buffer(&self) -> bool {
match self.strategy {
Strategy::Flatten => {
self.remaining() < self.max_buf_size
},
Strategy::Queue => {
// for now, the simplest of heuristics
self.buf.bufs.len() < MAX_BUF_LIST_BUFFERS
&& self.remaining() < self.max_buf_size
},
}
len
}
fn head_mut(&mut self) -> &mut Cursor<Vec<u8>> {
// this dance is brought to you, The Borrow Checker!
let reuse_back = if let Some(&VecOrBuf::Vec(_)) = self.buf.bufs.back() {
true
} else {
false
};
if !reuse_back {
let head_buf = Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE));
self.buf.bufs.push_back(VecOrBuf::Vec(head_buf));
}
if let Some(&mut VecOrBuf::Vec(ref mut v)) = self.buf.bufs.back_mut() {
v
} else {
unreachable!("head_buf just pushed on back");
}
}
}
impl<B: Buf> fmt::Debug for WriteBuf<B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("WriteBuf")
.field("remaining", &self.remaining())
.field("strategy", &self.strategy)
.finish()
}
}
impl<B: Buf> Buf for WriteBuf<B> {
#[inline]
fn remaining(&self) -> usize {
self.buf.remaining()
}
#[inline]
fn maybe_reserve(&mut self, needed: usize) {
let vec = &mut self.buf.bytes;
let cap = vec.capacity();
if cap == 0 {
let init = cmp::min(self.max_buf_size, cmp::max(INIT_BUFFER_SIZE, needed));
trace!("WriteBuf reserving initial {}", init);
vec.reserve(init);
} else if cap < self.max_buf_size {
vec.reserve(cmp::min(needed, self.max_buf_size - cap));
trace!("WriteBuf reserved {}", vec.capacity() - cap);
fn bytes(&self) -> &[u8] {
self.buf.bytes()
}
#[inline]
fn advance(&mut self, cnt: usize) {
self.buf.advance(cnt)
}
#[inline]
fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize {
self.buf.bytes_vec(dst)
}
}
#[derive(Debug)]
enum Strategy {
Flatten,
Queue,
}
enum VecOrBuf<B> {
Vec(Cursor<Vec<u8>>),
Buf(B),
}
impl<B: Buf> Buf for VecOrBuf<B> {
#[inline]
fn remaining(&self) -> usize {
match *self {
VecOrBuf::Vec(ref v) => v.remaining(),
VecOrBuf::Buf(ref b) => b.remaining(),
}
}
fn maybe_reset(&mut self) {
if self.buf.pos != 0 && self.buf.remaining() == 0 {
self.buf.pos = 0;
unsafe {
self.buf.bytes.set_len(0);
#[inline]
fn bytes(&self) -> &[u8] {
match *self {
VecOrBuf::Vec(ref v) => v.bytes(),
VecOrBuf::Buf(ref b) => b.bytes(),
}
}
#[inline]
fn advance(&mut self, cnt: usize) {
match *self {
VecOrBuf::Vec(ref mut v) => v.advance(cnt),
VecOrBuf::Buf(ref mut b) => b.advance(cnt),
}
}
#[inline]
fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize {
match *self {
VecOrBuf::Vec(ref v) => v.bytes_vec(dst),
VecOrBuf::Buf(ref b) => b.bytes_vec(dst),
}
}
}
struct BufDeque<T> {
bufs: VecDeque<VecOrBuf<T>>,
}
impl<T> BufDeque<T> {
fn new() -> BufDeque<T> {
BufDeque {
bufs: VecDeque::new(),
}
}
}
impl<T: Buf> Buf for BufDeque<T> {
#[inline]
fn remaining(&self) -> usize {
self.bufs.iter()
.map(|buf| buf.remaining())
.sum()
}
#[inline]
fn bytes(&self) -> &[u8] {
if let Some(buf) = self.bufs.front() {
buf.bytes()
} else {
&[]
}
}
#[inline]
fn advance(&mut self, mut cnt: usize) {
let mut maybe_reclaim = None;
while cnt > 0 {
{
let front = &mut self.bufs[0];
let rem = front.remaining();
if rem > cnt {
front.advance(cnt);
return;
} else {
front.advance(rem);
cnt -= rem;
}
}
maybe_reclaim = self.bufs.pop_front();
}
if let Some(VecOrBuf::Vec(v)) = maybe_reclaim {
trace!("reclaiming write buf Vec");
self.bufs.push_back(VecOrBuf::Vec(v));
}
}
#[inline]
fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize {
if dst.is_empty() {
return 0;
}
let mut vecs = 0;
for buf in &self.bufs {
vecs += buf.bytes_vec(&mut dst[vecs..]);
if vecs == dst.len() {
break;
}
}
vecs
}
}
@@ -351,7 +489,7 @@ fn test_iobuf_write_empty_slice() {
let mut mock = AsyncIo::new(MockBuf::new(), 256);
mock.error(io::Error::new(io::ErrorKind::Other, "logic error"));
let mut io_buf = Buffered::new(mock);
let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
// underlying io will return the logic error upon write,
// so we are testing that the io_buf does not trigger a write
@@ -366,7 +504,7 @@ fn test_parse_reads_until_blocked() {
let raw = "HTTP/1.1 200 OK\r\n";
let mock = AsyncIo::new(MockBuf::wrap(raw.into()), raw.len());
let mut buffered = Buffered::new(mock);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
assert_eq!(buffered.parse::<super::ClientTransaction>().unwrap(), Async::NotReady);
assert!(buffered.io.blocked());
}

View File

@@ -23,6 +23,7 @@ use std::thread;
use std::time::Duration;
use hyper::StatusCode;
use hyper::header::ContentLength;
use hyper::server::{Http, Request, Response, Service, NewService, service_fn};
@@ -630,8 +631,14 @@ fn expect_continue() {
fn pipeline_disabled() {
let server = serve();
let mut req = connect(server.addr());
server.reply().status(hyper::Ok);
server.reply().status(hyper::Ok);
server.reply()
.status(hyper::Ok)
.header(ContentLength(12))
.body("Hello World!");
server.reply()
.status(hyper::Ok)
.header(ContentLength(12))
.body("Hello World!");
req.write_all(b"\
GET / HTTP/1.1\r\n\
@@ -671,8 +678,14 @@ fn pipeline_enabled() {
.. Default::default()
});
let mut req = connect(server.addr());
server.reply().status(hyper::Ok);
server.reply().status(hyper::Ok);
server.reply()
.status(hyper::Ok)
.header(ContentLength(12))
.body("Hello World\n");
server.reply()
.status(hyper::Ok)
.header(ContentLength(12))
.body("Hello World\n");
req.write_all(b"\
GET / HTTP/1.1\r\n\
@@ -687,6 +700,23 @@ fn pipeline_enabled() {
let mut buf = vec![0; 4096];
let n = req.read(&mut buf).expect("read 1");
assert_ne!(n, 0);
{
let mut lines = buf.split(|&b| b == b'\n');
assert_eq!(s(lines.next().unwrap()), "HTTP/1.1 200 OK\r");
assert_eq!(s(lines.next().unwrap()), "Content-Length: 12\r");
lines.next().unwrap(); // Date
assert_eq!(s(lines.next().unwrap()), "\r");
assert_eq!(s(lines.next().unwrap()), "Hello World");
assert_eq!(s(lines.next().unwrap()), "HTTP/1.1 200 OK\r");
assert_eq!(s(lines.next().unwrap()), "Content-Length: 12\r");
lines.next().unwrap(); // Date
assert_eq!(s(lines.next().unwrap()), "\r");
assert_eq!(s(lines.next().unwrap()), "Hello World");
}
// with pipeline enabled, both responses should have been in the first read
// so a second read should be EOF
let n = req.read(&mut buf).expect("read 2");
@@ -992,6 +1022,51 @@ fn max_buf_size() {
core.run(fut).unwrap_err();
}
#[test]
fn streaming_body() {
let _ = pretty_env_logger::try_init();
let mut core = Core::new().unwrap();
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap();
let addr = listener.local_addr().unwrap();
thread::spawn(move || {
let mut tcp = connect(&addr);
tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap();
let mut buf = [0; 8192];
let mut sum = tcp.read(&mut buf).expect("read 1");
let expected = "HTTP/1.1 200 ";
assert_eq!(s(&buf[..expected.len()]), expected);
loop {
let n = tcp.read(&mut buf).expect("read loop");
sum += n;
if n == 0 {
break;
}
}
assert_eq!(sum, 1_007_089);
});
let fut = listener.incoming()
.into_future()
.map_err(|_| unreachable!())
.and_then(|(item, _incoming)| {
let (socket, _) = item.unwrap();
Http::<& &'static [u8]>::new()
.keep_alive(false)
.serve_connection(socket, service_fn(|_| {
static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _;
let b = ::futures::stream::iter_ok(S.iter());
Ok(Response::<futures::stream::IterOk<::std::slice::Iter<&'static [u8]>, ::hyper::Error>>::new()
.with_body(b))
}))
.map(|_| ())
});
core.run(fut).unwrap();
}
#[test]
fn remote_addr() {
let server = serve();
@@ -1085,6 +1160,12 @@ impl<'a> ReplyBuilder<'a> {
}
}
impl<'a> Drop for ReplyBuilder<'a> {
fn drop(&mut self) {
let _ = self.tx.send(Reply::End);
}
}
impl Drop for Serve {
fn drop(&mut self) {
drop(self.shutdown_signal.take());
@@ -1104,6 +1185,7 @@ enum Reply {
Status(hyper::StatusCode),
Headers(hyper::Headers),
Body(Vec<u8>),
End,
}
#[derive(Debug)]
@@ -1164,6 +1246,7 @@ impl Service for TestService {
Reply::Body(body) => {
res.set_body(body);
},
Reply::End => break,
}
}
res