feat(lib): add support to disable tokio-proto internals

For now, this adds `client::Config::no_proto`, `server::Http::no_proto`,
and `server::Server::no_proto` to skip tokio-proto implementations, and
use an internal dispatch system instead.

`Http::no_proto` is similar to `Http::bind_connection`, but returns a
`Connection` that is a `Future` to drive HTTP with the provided service.
Any errors prior to parsing a request, and after delivering a response
(but before flush the response body) will be returned from this future.

See #1342 for more.
This commit is contained in:
Sean McArthur
2017-10-02 15:05:40 -07:00
parent 8153cfaebf
commit f7532b71d1
14 changed files with 1040 additions and 155 deletions

View File

@@ -7,6 +7,7 @@ use std::borrow::Cow;
use super::Chunk;
pub type TokioBody = tokio_proto::streaming::Body<Chunk, ::Error>;
pub type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
/// A `Stream` for `Chunk`s used in requests and responses.
#[must_use = "streams do nothing unless polled"]

View File

@@ -7,7 +7,7 @@ use futures::task::Task;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_proto::streaming::pipeline::{Frame, Transport};
use proto::{Http1Transaction};
use proto::Http1Transaction;
use super::io::{Cursor, Buffered};
use super::h1::{Encoder, Decoder};
use method::Method;
@@ -51,15 +51,28 @@ where I: AsyncRead + AsyncWrite,
self.io.set_flush_pipeline(enabled);
}
fn poll2(&mut self) -> Poll<Option<Frame<super::MessageHead<T::Incoming>, super::Chunk, ::Error>>, io::Error> {
trace!("Conn::poll()");
fn poll_incoming(&mut self) -> Poll<Option<Frame<super::MessageHead<T::Incoming>, super::Chunk, ::Error>>, io::Error> {
trace!("Conn::poll_incoming()");
loop {
if self.is_read_closed() {
trace!("Conn::poll when closed");
return Ok(Async::Ready(None));
} else if self.can_read_head() {
return self.read_head();
return match self.read_head() {
Ok(Async::Ready(Some((head, body)))) => {
Ok(Async::Ready(Some(Frame::Message {
message: head,
body: body,
})))
},
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(::Error::Io(err)) => Err(err),
Err(err) => Ok(Async::Ready(Some(Frame::Error {
error: err,
}))),
};
} else if self.can_write_continue() {
try_nb!(self.flush());
} else if self.can_read_body() {
@@ -79,16 +92,15 @@ where I: AsyncRead + AsyncWrite,
}
}
fn is_read_closed(&self) -> bool {
pub fn is_read_closed(&self) -> bool {
self.state.is_read_closed()
}
#[allow(unused)]
fn is_write_closed(&self) -> bool {
pub fn is_write_closed(&self) -> bool {
self.state.is_write_closed()
}
fn can_read_head(&self) -> bool {
pub fn can_read_head(&self) -> bool {
match self.state.reading {
Reading::Init => true,
_ => false,
@@ -102,14 +114,14 @@ where I: AsyncRead + AsyncWrite,
}
}
fn can_read_body(&self) -> bool {
pub fn can_read_body(&self) -> bool {
match self.state.reading {
Reading::Body(..) => true,
_ => false,
}
}
fn read_head(&mut self) -> Poll<Option<Frame<super::MessageHead<T::Incoming>, super::Chunk, ::Error>>, io::Error> {
pub fn read_head(&mut self) -> Poll<Option<(super::MessageHead<T::Incoming>, bool)>, ::Error> {
debug_assert!(self.can_read_head());
trace!("Conn::read_head");
@@ -117,13 +129,16 @@ where I: AsyncRead + AsyncWrite,
Ok(Async::Ready(head)) => (head.version, head),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
let must_respond_with_error = !self.state.is_idle();
// If we are currently waiting on a message, then an empty
// message should be reported as an error. If not, it is just
// the connection closing gracefully.
let must_error = !self.state.is_idle() && T::should_error_on_parse_eof();
self.state.close_read();
self.io.consume_leading_lines();
let was_mid_parse = !self.io.read_buf().is_empty();
return if was_mid_parse || must_respond_with_error {
return if was_mid_parse || must_error {
debug!("parse error ({}) with {} bytes", e, self.io.read_buf().len());
Ok(Async::Ready(Some(Frame::Error { error: e })))
Err(e)
} else {
debug!("read eof");
Ok(Async::Ready(None))
@@ -138,7 +153,7 @@ where I: AsyncRead + AsyncWrite,
Err(e) => {
debug!("decoder error = {:?}", e);
self.state.close_read();
return Ok(Async::Ready(Some(Frame::Error { error: e })));
return Err(e);
}
};
self.state.busy();
@@ -154,17 +169,17 @@ where I: AsyncRead + AsyncWrite,
(true, Reading::Body(decoder))
};
self.state.reading = reading;
Ok(Async::Ready(Some(Frame::Message { message: head, body: body })))
Ok(Async::Ready(Some((head, body))))
},
_ => {
error!("unimplemented HTTP Version = {:?}", version);
self.state.close_read();
Ok(Async::Ready(Some(Frame::Error { error: ::Error::Version })))
Err(::Error::Version)
}
}
}
fn read_body(&mut self) -> Poll<Option<super::Chunk>, io::Error> {
pub fn read_body(&mut self) -> Poll<Option<super::Chunk>, io::Error> {
debug_assert!(self.can_read_body());
trace!("Conn::read_body");
@@ -187,7 +202,7 @@ where I: AsyncRead + AsyncWrite,
ret
}
fn maybe_park_read(&mut self) {
pub fn maybe_park_read(&mut self) {
if !self.io.is_read_blocked() {
// the Io object is ready to read, which means it will never alert
// us that it is ready until we drain it. However, we're currently
@@ -236,13 +251,16 @@ where I: AsyncRead + AsyncWrite,
return
},
Err(e) => {
trace!("maybe_notify read_from_io error: {}", e);
trace!("maybe_notify; read_from_io error: {}", e);
self.state.close();
}
}
}
if let Some(ref task) = self.state.read_task {
trace!("maybe_notify; notifying task");
task.notify();
} else {
trace!("maybe_notify; no task to notify");
}
}
}
@@ -252,14 +270,14 @@ where I: AsyncRead + AsyncWrite,
self.maybe_notify();
}
fn can_write_head(&self) -> bool {
pub fn can_write_head(&self) -> bool {
match self.state.writing {
Writing::Continue(..) | Writing::Init => true,
_ => false
}
}
fn can_write_body(&self) -> bool {
pub fn can_write_body(&self) -> bool {
match self.state.writing {
Writing::Body(..) => true,
Writing::Continue(..) |
@@ -277,7 +295,7 @@ where I: AsyncRead + AsyncWrite,
}
}
fn write_head(&mut self, head: super::MessageHead<T::Outgoing>, body: bool) {
pub fn write_head(&mut self, head: super::MessageHead<T::Outgoing>, body: bool) {
debug_assert!(self.can_write_head());
let wants_keep_alive = head.should_keep_alive();
@@ -298,7 +316,7 @@ where I: AsyncRead + AsyncWrite,
};
}
fn write_body(&mut self, chunk: Option<B>) -> StartSend<Option<B>, io::Error> {
pub fn write_body(&mut self, chunk: Option<B>) -> StartSend<Option<B>, io::Error> {
debug_assert!(self.can_write_body());
if self.has_queued_body() {
@@ -397,7 +415,7 @@ where I: AsyncRead + AsyncWrite,
Ok(Async::Ready(()))
}
fn flush(&mut self) -> Poll<(), io::Error> {
pub fn flush(&mut self) -> Poll<(), io::Error> {
loop {
let queue_finished = try!(self.write_queued()).is_ready();
try_nb!(self.io.flush());
@@ -410,8 +428,18 @@ where I: AsyncRead + AsyncWrite,
Ok(Async::Ready(()))
}
pub fn close_read(&mut self) {
self.state.close_read();
}
pub fn close_write(&mut self) {
self.state.close_write();
}
}
// ==== tokio_proto impl ====
impl<I, B, T, K> Stream for Conn<I, B, T, K>
where I: AsyncRead + AsyncWrite,
B: AsRef<[u8]>,
@@ -423,7 +451,7 @@ where I: AsyncRead + AsyncWrite,
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.poll2().map_err(|err| {
self.poll_incoming().map_err(|err| {
debug!("poll error: {}", err);
err
})
@@ -635,6 +663,12 @@ impl<B, K: KeepAlive> State<B, K> {
self.keep_alive.disable();
}
fn close_write(&mut self) {
trace!("State::close_write()");
self.writing = Writing::Closed;
self.keep_alive.disable();
}
fn try_keep_alive(&mut self) {
match (&self.reading, &self.writing) {
(&Reading::KeepAlive, &Writing::KeepAlive) => {
@@ -652,14 +686,6 @@ impl<B, K: KeepAlive> State<B, K> {
}
}
fn is_idle(&self) -> bool {
if let KA::Idle = self.keep_alive.status() {
true
} else {
false
}
}
fn busy(&mut self) {
if let KA::Disabled = self.keep_alive.status() {
return;
@@ -674,6 +700,14 @@ impl<B, K: KeepAlive> State<B, K> {
self.keep_alive.idle();
}
fn is_idle(&self) -> bool {
if let KA::Idle = self.keep_alive.status() {
true
} else {
false
}
}
fn is_read_closed(&self) -> bool {
match self.reading {
Reading::Closed => true,
@@ -681,7 +715,6 @@ impl<B, K: KeepAlive> State<B, K> {
}
}
#[allow(unused)]
fn is_write_closed(&self) -> bool {
match self.writing {
Writing::Closed => true,
@@ -727,7 +760,7 @@ mod tests {
use futures::future;
use tokio_proto::streaming::pipeline::Frame;
use proto::{self, MessageHead, ServerTransaction};
use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
use super::super::h1::Encoder;
use mock::AsyncIo;
@@ -799,21 +832,32 @@ mod tests {
let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.idle();
match conn.poll().unwrap() {
Async::Ready(Some(Frame::Error { .. })) => {},
other => panic!("frame is not Error: {:?}", other)
match conn.poll() {
Err(ref err) if err.kind() == ::std::io::ErrorKind::UnexpectedEof => {},
other => panic!("unexpected frame: {:?}", other)
}
}
#[test]
fn test_conn_init_read_eof_busy() {
// server ignores
let io = AsyncIo::new_buf(vec![], 1);
let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.busy();
match conn.poll().unwrap() {
Async::Ready(Some(Frame::Error { .. })) => {},
other => panic!("frame is not Error: {:?}", other)
Async::Ready(None) => {},
other => panic!("unexpected frame: {:?}", other)
}
// client, when busy, returns the error
let io = AsyncIo::new_buf(vec![], 1);
let mut conn = Conn::<_, proto::Chunk, ClientTransaction>::new(io, Default::default());
conn.state.busy();
match conn.poll() {
Err(ref err) if err.kind() == ::std::io::ErrorKind::UnexpectedEof => {},
other => panic!("unexpected frame: {:?}", other)
}
}

325
src/proto/dispatch.rs Normal file
View File

@@ -0,0 +1,325 @@
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use futures::sync::{mpsc, oneshot};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_service::Service;
use super::{Body, Conn, KeepAlive, Http1Transaction, MessageHead, RequestHead, ResponseHead};
use ::StatusCode;
pub struct Dispatcher<D, Bs, I, B, T, K> {
conn: Conn<I, B, T, K>,
dispatch: D,
body_tx: Option<super::body::BodySender>,
body_rx: Option<Bs>,
}
pub trait Dispatch {
type PollItem;
type PollBody;
type RecvItem;
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error>;
fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>;
fn should_poll(&self) -> bool;
}
pub struct Server<S: Service> {
in_flight: Option<S::Future>,
service: S,
}
pub struct Client<B> {
callback: Option<oneshot::Sender<::Result<::Response>>>,
rx: ClientRx<B>,
}
type ClientRx<B> = mpsc::Receiver<(RequestHead, Option<B>, oneshot::Sender<::Result<::Response>>)>;
impl<D, Bs, I, B, T, K> Dispatcher<D, Bs, I, B, T, K>
where
D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>,
I: AsyncRead + AsyncWrite,
B: AsRef<[u8]>,
T: Http1Transaction,
K: KeepAlive,
Bs: Stream<Item=B, Error=::Error>,
{
pub fn new(dispatch: D, conn: Conn<I, B, T, K>) -> Self {
Dispatcher {
conn: conn,
dispatch: dispatch,
body_tx: None,
body_rx: None,
}
}
fn poll_read(&mut self) -> Poll<(), ::Error> {
loop {
if self.conn.can_read_head() {
match self.conn.read_head() {
Ok(Async::Ready(Some((head, has_body)))) => {
let body = if has_body {
let (tx, rx) = super::Body::pair();
self.body_tx = Some(tx);
rx
} else {
Body::empty()
};
self.dispatch.recv_msg(Ok((head, body))).expect("recv_msg with Ok shouldn't error");
},
Ok(Async::Ready(None)) => {
// read eof, conn will start to shutdown automatically
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
debug!("read_head error: {}", err);
self.dispatch.recv_msg(Err(err))?;
// if here, the dispatcher gave the user the error
// somewhere else. we still need to shutdown, but
// not as a second error.
return Ok(Async::Ready(()));
}
}
} else if let Some(mut body) = self.body_tx.take() {
let can_read_body = self.conn.can_read_body();
match body.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => {
self.body_tx = Some(body);
return Ok(Async::NotReady);
},
Err(_canceled) => {
// user doesn't care about the body
// so we should stop reading
if can_read_body {
trace!("body receiver dropped before eof, closing");
self.conn.close_read();
return Ok(Async::Ready(()));
}
}
}
if can_read_body {
match self.conn.read_body() {
Ok(Async::Ready(Some(chunk))) => {
match body.start_send(Ok(chunk)) {
Ok(AsyncSink::Ready) => {
self.body_tx = Some(body);
},
Ok(AsyncSink::NotReady(_chunk)) => {
unreachable!("mpsc poll_ready was ready, start_send was not");
}
Err(_canceled) => {
if self.conn.can_read_body() {
trace!("body receiver dropped before eof, closing");
self.conn.close_read();
}
}
}
},
Ok(Async::Ready(None)) => {
let _ = body.close();
},
Ok(Async::NotReady) => {
self.body_tx = Some(body);
return Ok(Async::NotReady);
}
Err(e) => {
let _ = body.start_send(Err(::Error::Io(e)));
}
}
} else {
let _ = body.close();
}
} else {
self.conn.maybe_park_read();
return Ok(Async::Ready(()));
}
}
}
fn poll_write(&mut self) -> Poll<(), ::Error> {
loop {
if self.body_rx.is_none() && self.dispatch.should_poll() {
if let Some((head, body)) = try_ready!(self.dispatch.poll_msg()) {
self.conn.write_head(head, body.is_some());
self.body_rx = body;
} else {
self.conn.close_write();
return Ok(Async::Ready(()));
}
} else if let Some(mut body) = self.body_rx.take() {
let chunk = match body.poll()? {
Async::Ready(Some(chunk)) => {
self.body_rx = Some(body);
chunk
},
Async::Ready(None) => {
if self.conn.can_write_body() {
self.conn.write_body(None)?;
}
continue;
},
Async::NotReady => {
self.body_rx = Some(body);
return Ok(Async::NotReady);
}
};
self.conn.write_body(Some(chunk))?;
} else {
return Ok(Async::NotReady);
}
}
}
fn poll_flush(&mut self) -> Poll<(), ::Error> {
self.conn.flush().map_err(|err| {
debug!("error writing: {}", err);
err.into()
})
}
fn is_done(&self) -> bool {
let read_done = self.conn.is_read_closed();
let write_done = self.conn.is_write_closed() ||
(!self.dispatch.should_poll() && self.body_rx.is_none());
read_done && write_done
}
}
impl<D, Bs, I, B, T, K> Future for Dispatcher<D, Bs, I, B, T, K>
where
D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>,
I: AsyncRead + AsyncWrite,
B: AsRef<[u8]>,
T: Http1Transaction,
K: KeepAlive,
Bs: Stream<Item=B, Error=::Error>,
{
type Item = ();
type Error = ::Error;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll_read()?;
self.poll_write()?;
self.poll_flush()?;
if self.is_done() {
trace!("Dispatch::poll done");
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
// ===== impl Server =====
impl<S> Server<S> where S: Service {
pub fn new(service: S) -> Server<S> {
Server {
in_flight: None,
service: service,
}
}
}
impl<S, Bs> Dispatch for Server<S>
where
S: Service<Request=::Request, Response=::Response<Bs>, Error=::Error>,
Bs: Stream<Error=::Error>,
Bs::Item: AsRef<[u8]>,
{
type PollItem = MessageHead<StatusCode>;
type PollBody = Bs;
type RecvItem = RequestHead;
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error> {
if let Some(mut fut) = self.in_flight.take() {
let resp = match fut.poll()? {
Async::Ready(res) => res,
Async::NotReady => {
self.in_flight = Some(fut);
return Ok(Async::NotReady);
}
};
let (head, body) = super::response::split(resp);
Ok(Async::Ready(Some((head.into(), body))))
} else {
unreachable!("poll_msg shouldn't be called if no inflight");
}
}
fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> {
let (msg, body) = msg?;
let req = super::request::from_wire(None, msg, body);
self.in_flight = Some(self.service.call(req));
Ok(())
}
fn should_poll(&self) -> bool {
self.in_flight.is_some()
}
}
// ===== impl Client =====
impl<B> Client<B> {
pub fn new(rx: ClientRx<B>) -> Client<B> {
Client {
callback: None,
rx: rx,
}
}
}
impl<B> Dispatch for Client<B>
where
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
type PollItem = RequestHead;
type PollBody = B;
type RecvItem = ResponseHead;
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error> {
match self.rx.poll() {
Ok(Async::Ready(Some((head, body, cb)))) => {
self.callback = Some(cb);
Ok(Async::Ready(Some((head, body))))
},
Ok(Async::Ready(None)) => {
// user has dropped sender handle
Ok(Async::Ready(None))
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => unreachable!("mpsc receiver cannot error"),
}
}
fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> {
match msg {
Ok((msg, body)) => {
let res = super::response::from_wire(msg, Some(body));
let cb = self.callback.take().expect("recv_msg without callback");
let _ = cb.send(Ok(res));
Ok(())
},
Err(err) => {
if let Some(cb) = self.callback.take() {
let _ = cb.send(Err(err));
Ok(())
} else {
Err(err)
}
}
}
}
fn should_poll(&self) -> bool {
self.callback.is_none()
}
}

View File

@@ -132,6 +132,10 @@ impl Http1Transaction for ServerTransaction {
extend(dst, b"\r\n");
body
}
fn should_error_on_parse_eof() -> bool {
false
}
}
impl ServerTransaction {
@@ -281,6 +285,10 @@ impl Http1Transaction for ClientTransaction {
body
}
fn should_error_on_parse_eof() -> bool {
true
}
}
impl ClientTransaction {

View File

@@ -84,7 +84,7 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
match try_ready!(self.read_from_io()) {
0 => {
trace!("parse eof");
//TODO: With Rust 1.14, this can be Error::from(ErrorKind)
//TODO: utilize Error::Incomplete when Error type is redesigned
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, ParseEof).into());
}
_ => {},
@@ -335,13 +335,13 @@ struct ParseEof;
impl fmt::Display for ParseEof {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("parse eof")
f.write_str(::std::error::Error::description(self))
}
}
impl ::std::error::Error for ParseEof {
fn description(&self) -> &str {
"parse eof"
"end of file reached before parsing could complete"
}
}

View File

@@ -19,6 +19,7 @@ pub use self::chunk::Chunk;
mod body;
mod chunk;
mod conn;
pub mod dispatch;
mod io;
mod h1;
//mod h2;
@@ -146,6 +147,8 @@ pub trait Http1Transaction {
fn parse(bytes: &mut BytesMut) -> ParseResult<Self::Incoming>;
fn decoder(head: &MessageHead<Self::Incoming>, method: &mut Option<::Method>) -> ::Result<h1::Decoder>;
fn encode(head: MessageHead<Self::Outgoing>, has_body: bool, method: &mut Option<Method>, dst: &mut Vec<u8>) -> h1::Encoder;
fn should_error_on_parse_eof() -> bool;
}
pub type ParseResult<T> = ::Result<Option<(MessageHead<T>, usize)>>;