feat(http): allow specifying custom body streams

This commit is contained in:
Sean McArthur
2017-02-16 12:39:50 -08:00
parent 2266d869ca
commit 1b1311a7d3
11 changed files with 384 additions and 276 deletions

View File

@@ -8,7 +8,7 @@ use tokio::io::Io;
use tokio_proto::streaming::pipeline::{Frame, Transport};
use header::{ContentLength, TransferEncoding};
use http::{self, Http1Transaction};
use http::{self, Http1Transaction, DebugTruncate};
use http::io::{Cursor, Buffered};
use http::h1::{Encoder, Decoder};
use version::HttpVersion;
@@ -21,14 +21,19 @@ use version::HttpVersion;
/// The connection will determine when a message begins and ends as well as
/// determine if this connection can be kept alive after the message,
/// or if it is complete.
pub struct Conn<I, T, K = KA> {
pub struct Conn<I, B, T, K = KA> {
io: Buffered<I>,
state: State<K>,
state: State<B, K>,
_marker: PhantomData<T>
}
impl<I: Io, T: Http1Transaction, K: KeepAlive> Conn<I, T, K> {
pub fn new(io: I, keep_alive: K) -> Conn<I, T, K> {
impl<I, B, T, K> Conn<I, B, T, K>
where I: Io,
B: AsRef<[u8]>,
T: Http1Transaction,
K: KeepAlive
{
pub fn new(io: I, keep_alive: K) -> Conn<I, B, T, K> {
Conn {
io: Buffered::new(io),
state: State {
@@ -169,7 +174,10 @@ impl<I: Io, T: Http1Transaction, K: KeepAlive> Conn<I, T, K> {
fn can_write_body(&self) -> bool {
match self.state.writing {
Writing::Body(..) => true,
_ => false
Writing::Init |
Writing::Ending(..) |
Writing::KeepAlive |
Writing::Closed => false,
}
}
@@ -189,7 +197,8 @@ impl<I: Io, T: Http1Transaction, K: KeepAlive> Conn<I, T, K> {
self.state.keep_alive &= wants_keep_alive;
let mut buf = Vec::new();
let encoder = T::encode(&mut head, &mut buf);
self.io.buffer(buf);
//TODO: handle when there isn't enough room to buffer the head
assert!(self.io.buffer(buf) > 0);
self.state.writing = if body {
Writing::Body(encoder, None)
} else {
@@ -199,7 +208,7 @@ impl<I: Io, T: Http1Transaction, K: KeepAlive> Conn<I, T, K> {
Ok(AsyncSink::Ready)
}
fn write_body(&mut self, chunk: Option<http::Chunk>) -> StartSend<Option<http::Chunk>, io::Error> {
fn write_body(&mut self, chunk: Option<B>) -> StartSend<Option<B>, io::Error> {
debug_assert!(self.can_write_body());
let state = match self.state.writing {
@@ -207,47 +216,41 @@ impl<I: Io, T: Http1Transaction, K: KeepAlive> Conn<I, T, K> {
if queued.is_some() {
return Ok(AsyncSink::NotReady(chunk));
}
let mut is_done = true;
let mut wbuf = Cursor::new(match chunk {
Some(chunk) => {
is_done = false;
chunk
}
None => {
// Encode a zero length chunk
// the http1 encoder does the right thing
// encoding either the final chunk or ignoring the input
http::Chunk::from(Vec::new())
}
});
if let Some(chunk) = chunk {
let mut cursor = Cursor::new(chunk);
match encoder.encode(&mut self.io, cursor.buf()) {
Ok(n) => {
cursor.consume(n);
match encoder.encode(&mut self.io, wbuf.buf()) {
Ok(n) => {
wbuf.consume(n);
if !wbuf.is_written() {
trace!("Conn::start_send frame not written, queued");
*queued = Some(wbuf);
}
},
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => {
trace!("Conn::start_send frame not written, queued");
*queued = Some(wbuf);
if !cursor.is_written() {
trace!("Conn::start_send frame not written, queued");
*queued = Some(cursor);
}
},
_ => return Err(e)
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => {
trace!("Conn::start_send frame not written, queued");
*queued = Some(cursor);
},
_ => return Err(e)
}
}
}
if encoder.is_eof() {
Writing::KeepAlive
} else if is_done {
Writing::Closed
if encoder.is_eof() {
Writing::KeepAlive
} else {
return Ok(AsyncSink::Ready);
}
} else {
return Ok(AsyncSink::Ready);
// end of stream, that means we should try to eof
match encoder.eof() {
Ok(Some(end)) => Writing::Ending(Cursor::new(end)),
Ok(None) => Writing::KeepAlive,
Err(_not_eof) => Writing::Closed,
}
}
},
Writing::Init | Writing::KeepAlive | Writing::Closed => unreachable!(),
_ => unreachable!(),
};
self.state.writing = state;
Ok(AsyncSink::Ready)
@@ -255,7 +258,7 @@ impl<I: Io, T: Http1Transaction, K: KeepAlive> Conn<I, T, K> {
fn write_queued(&mut self) -> Poll<(), io::Error> {
trace!("Conn::write_queued()");
match self.state.writing {
let state = match self.state.writing {
Writing::Body(ref mut encoder, ref mut queued) => {
let complete = if let Some(chunk) = queued.as_mut() {
let n = try_nb!(encoder.encode(&mut self.io, chunk.buf()));
@@ -265,15 +268,26 @@ impl<I: Io, T: Http1Transaction, K: KeepAlive> Conn<I, T, K> {
true
};
trace!("Conn::write_queued complete = {}", complete);
if complete {
return if complete {
*queued = None;
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
};
},
Writing::Ending(ref mut ending) => {
let n = self.io.buffer(ending.buf());
ending.consume(n);
if ending.is_written() {
Writing::KeepAlive
} else {
return Ok(Async::NotReady);
}
},
_ => Ok(Async::Ready(())),
}
_ => return Ok(Async::Ready(())),
};
self.state.writing = state;
Ok(Async::Ready(()))
}
fn flush(&mut self) -> Poll<(), io::Error> {
@@ -289,8 +303,9 @@ impl<I: Io, T: Http1Transaction, K: KeepAlive> Conn<I, T, K> {
}
}
impl<I, T, K> Stream for Conn<I, T, K>
impl<I, B, T, K> Stream for Conn<I, B, T, K>
where I: Io,
B: AsRef<[u8]>,
T: Http1Transaction,
K: KeepAlive,
T::Outgoing: fmt::Debug {
@@ -317,12 +332,13 @@ where I: Io,
}
}
impl<I, T, K> Sink for Conn<I, T, K>
impl<I, B, T, K> Sink for Conn<I, B, T, K>
where I: Io,
B: AsRef<[u8]>,
T: Http1Transaction,
K: KeepAlive,
T::Outgoing: fmt::Debug {
type SinkItem = Frame<http::MessageHead<T::Outgoing>, http::Chunk, ::Error>;
type SinkItem = Frame<http::MessageHead<T::Outgoing>, B, ::Error>;
type SinkError = io::Error;
fn start_send(&mut self, frame: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
@@ -371,7 +387,7 @@ where I: Io,
},
};
error!("writing illegal frame; state={:?}, frame={:?}", self.state.writing, frame);
error!("writing illegal frame; state={:?}, frame={:?}", self.state.writing, DebugFrame(&frame));
Err(io::Error::new(io::ErrorKind::InvalidInput, "illegal frame"))
}
@@ -384,13 +400,14 @@ where I: Io,
}
}
impl<I, T, K> Transport for Conn<I, T, K>
impl<I, B, T, K> Transport for Conn<I, B, T, K>
where I: Io + 'static,
B: AsRef<[u8]> + 'static,
T: Http1Transaction + 'static,
K: KeepAlive + 'static,
T::Outgoing: fmt::Debug {}
impl<I, T, K: fmt::Debug> fmt::Debug for Conn<I, T, K> {
impl<I, B: AsRef<[u8]>, T, K: fmt::Debug> fmt::Debug for Conn<I, B, T, K> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Conn")
.field("state", &self.state)
@@ -399,10 +416,9 @@ impl<I, T, K: fmt::Debug> fmt::Debug for Conn<I, T, K> {
}
}
#[derive(Debug)]
struct State<K> {
struct State<B, K> {
reading: Reading,
writing: Writing,
writing: Writing<B>,
keep_alive: K,
}
@@ -414,14 +430,41 @@ enum Reading {
Closed,
}
#[derive(Debug)]
enum Writing {
enum Writing<B> {
Init,
Body(Encoder, Option<Cursor<http::Chunk>>),
Body(Encoder, Option<Cursor<B>>),
Ending(Cursor<&'static [u8]>),
KeepAlive,
Closed,
}
impl<B: AsRef<[u8]>, K: fmt::Debug> fmt::Debug for State<B, K> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("State")
.field("reading", &self.reading)
.field("writing", &self.writing)
.field("keep_alive", &self.keep_alive)
.finish()
}
}
impl<B: AsRef<[u8]>> fmt::Debug for Writing<B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Writing::Init => f.write_str("Init"),
Writing::Body(ref enc, ref queued) => f.debug_tuple("Body")
.field(enc)
.field(queued)
.finish(),
Writing::Ending(ref ending) => f.debug_tuple("Ending")
.field(ending)
.finish(),
Writing::KeepAlive => f.write_str("KeepAlive"),
Writing::Closed => f.write_str("Closed"),
}
}
}
impl ::std::ops::BitAndAssign<bool> for KA {
fn bitand_assign(&mut self, enabled: bool) {
if !enabled {
@@ -468,7 +511,7 @@ impl KeepAlive for KA {
}
}
impl<K: KeepAlive> State<K> {
impl<B, K: KeepAlive> State<B, K> {
fn close(&mut self) {
trace!("State::close()");
self.reading = Reading::Closed;
@@ -525,9 +568,9 @@ impl<K: KeepAlive> State<K> {
// The DebugFrame and DebugChunk are simple Debug implementations that allow
// us to dump the frame into logs, without logging the entirety of the bytes.
struct DebugFrame<'a, T: fmt::Debug + 'a>(&'a Frame<http::MessageHead<T>, http::Chunk, ::Error>);
struct DebugFrame<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a>(&'a Frame<http::MessageHead<T>, B, ::Error>);
impl<'a, T: fmt::Debug + 'a> fmt::Debug for DebugFrame<'a, T> {
impl<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a> fmt::Debug for DebugFrame<'a, T, B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self.0 {
Frame::Message { ref message, ref body } => {
@@ -538,7 +581,7 @@ impl<'a, T: fmt::Debug + 'a> fmt::Debug for DebugFrame<'a, T> {
},
Frame::Body { chunk: Some(ref chunk) } => {
f.debug_struct("Body")
.field("chunk", &DebugChunk(chunk))
.field("chunk", &DebugTruncate(chunk.as_ref()))
.finish()
},
Frame::Body { chunk: None } => {
@@ -555,22 +598,12 @@ impl<'a, T: fmt::Debug + 'a> fmt::Debug for DebugFrame<'a, T> {
}
}
struct DebugChunk<'a>(&'a http::Chunk);
impl<'a> fmt::Debug for DebugChunk<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("Chunk")
.field(&self.0.len())
.finish()
}
}
#[cfg(test)]
mod tests {
use futures::{Async, Stream, Sink};
use tokio_proto::streaming::pipeline::Frame;
use http::{MessageHead, ServerTransaction};
use http::{self, MessageHead, ServerTransaction};
use http::h1::Encoder;
use mock::AsyncIo;
@@ -584,7 +617,7 @@ mod tests {
let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
let len = good_message.len();
let io = AsyncIo::new_buf(good_message, len);
let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default());
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
match conn.poll().unwrap() {
Async::Ready(Some(Frame::Message { message, body: false })) => {
@@ -601,7 +634,7 @@ mod tests {
fn test_conn_parse_partial() {
let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
let io = AsyncIo::new_buf(good_message, 10);
let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default());
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
assert!(conn.poll().unwrap().is_not_ready());
conn.io.io_mut().block_in(50);
let async = conn.poll().unwrap();
@@ -615,7 +648,7 @@ mod tests {
#[test]
fn test_conn_closed_read() {
let io = AsyncIo::new_buf(vec![], 0);
let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default());
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.close();
match conn.poll().unwrap() {
@@ -631,7 +664,7 @@ mod tests {
let _ = pretty_env_logger::init();
let _: Result<(), ()> = ::futures::lazy(|| {
let io = AsyncIo::new_buf(vec![], 0);
let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default());
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
let max = ::http::io::MAX_BUFFER_SIZE + 4096;
conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64), None);
@@ -668,7 +701,7 @@ mod tests {
use ::futures::Future;
let _: Result<(), ()> = ::futures::lazy(|| {
let io = AsyncIo::new_buf(vec![], 4096);
let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default());
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.writing = Writing::Body(Encoder::chunked(), None);
assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
@@ -679,7 +712,7 @@ mod tests {
#[test]
fn test_conn_closed_write() {
let io = AsyncIo::new_buf(vec![], 0);
let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default());
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.close();
match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {