use std::io; use futures::{Async, AsyncSink, Future, Poll, Stream}; use futures::sync::oneshot; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_service::Service; use proto::{Body, Conn, KeepAlive, Http1Transaction, MessageHead, RequestHead, ResponseHead}; use ::StatusCode; pub struct Dispatcher { conn: Conn, dispatch: D, body_tx: Option<::proto::body::ChunkSender>, body_rx: Option, is_closing: bool, } pub trait Dispatch { type PollItem; type PollBody; type RecvItem; fn poll_msg(&mut self) -> Poll)>, ::Error>; fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Option)>) -> ::Result<()>; fn poll_ready(&mut self) -> Poll<(), ()>; fn should_poll(&self) -> bool; } pub struct Server { in_flight: Option, service: S, } pub struct Client { callback: Option>>, rx: ClientRx, } pub type ClientMsg = (RequestHead, Option); type ClientRx = ::client::dispatch::Receiver, ::Response>; impl Dispatcher where D: Dispatch, PollBody=Bs, RecvItem=MessageHead>, I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, K: KeepAlive, Bs: Stream, { pub fn new(dispatch: D, conn: Conn) -> Self { Dispatcher { conn: conn, dispatch: dispatch, body_tx: None, body_rx: None, is_closing: false, } } pub fn disable_keep_alive(&mut self) { self.conn.disable_keep_alive() } fn poll2(&mut self) -> Poll<(), ::Error> { self.poll_read()?; self.poll_write()?; self.poll_flush()?; if self.is_done() { try_ready!(self.conn.shutdown()); self.conn.take_error()?; trace!("Dispatch::poll done"); Ok(Async::Ready(())) } else { Ok(Async::NotReady) } } fn poll_read(&mut self) -> Poll<(), ::Error> { loop { if self.is_closing { return Ok(Async::Ready(())); } else if self.conn.can_read_head() { try_ready!(self.poll_read_head()); } else if let Some(mut body) = self.body_tx.take() { if self.conn.can_read_body() { match body.poll_ready() { Ok(Async::Ready(())) => (), Ok(Async::NotReady) => { self.body_tx = Some(body); return Ok(Async::NotReady); }, Err(_canceled) => { // user doesn't care about the body // so we should stop reading trace!("body receiver dropped before eof, closing"); self.conn.close_read(); return Ok(Async::Ready(())); } } match self.conn.read_body() { Ok(Async::Ready(Some(chunk))) => { match body.start_send(Ok(chunk)) { Ok(AsyncSink::Ready) => { self.body_tx = Some(body); }, Ok(AsyncSink::NotReady(_chunk)) => { unreachable!("mpsc poll_ready was ready, start_send was not"); } Err(_canceled) => { if self.conn.can_read_body() { trace!("body receiver dropped before eof, closing"); self.conn.close_read(); } } } }, Ok(Async::Ready(None)) => { // just drop, the body will close automatically }, Ok(Async::NotReady) => { self.body_tx = Some(body); return Ok(Async::NotReady); } Err(e) => { let _ = body.start_send(Err(::Error::Io(e))); } } } else { // just drop, the body will close automatically } } else { return self.conn.read_keep_alive().map(Async::Ready); } } } fn poll_read_head(&mut self) -> Poll<(), ::Error> { // can dispatch receive, or does it still care about, an incoming message? match self.dispatch.poll_ready() { Ok(Async::Ready(())) => (), Ok(Async::NotReady) => unreachable!("dispatch not ready when conn is"), Err(()) => { trace!("dispatch no longer receiving messages"); self.close(); return Ok(Async::Ready(())); } } // dispatch is ready for a message, try to read one match self.conn.read_head() { Ok(Async::Ready(Some((head, has_body)))) => { let body = if has_body { let (mut tx, rx) = ::proto::body::channel(); let _ = tx.poll_ready(); // register this task if rx is dropped self.body_tx = Some(tx); Some(rx) } else { None }; self.dispatch.recv_msg(Ok((head, body)))?; Ok(Async::Ready(())) }, Ok(Async::Ready(None)) => { // read eof, conn will start to shutdown automatically Ok(Async::Ready(())) } Ok(Async::NotReady) => Ok(Async::NotReady), Err(err) => { debug!("read_head error: {}", err); self.dispatch.recv_msg(Err(err))?; // if here, the dispatcher gave the user the error // somewhere else. we still need to shutdown, but // not as a second error. Ok(Async::Ready(())) } } } fn poll_write(&mut self) -> Poll<(), ::Error> { loop { if self.is_closing { return Ok(Async::Ready(())); } else if self.body_rx.is_none() && self.dispatch.should_poll() { if let Some((head, body)) = try_ready!(self.dispatch.poll_msg()) { self.conn.write_head(head, body.is_some()); self.body_rx = body; } else { self.close(); return Ok(Async::Ready(())); } } else if !self.conn.can_buffer_body() { try_ready!(self.poll_flush()); } else if let Some(mut body) = self.body_rx.take() { let chunk = match body.poll()? { Async::Ready(Some(chunk)) => { self.body_rx = Some(body); chunk }, Async::Ready(None) => { if self.conn.can_write_body() { self.conn.write_body(None)?; } continue; }, Async::NotReady => { self.body_rx = Some(body); return Ok(Async::NotReady); } }; if self.conn.can_write_body() { assert!(self.conn.write_body(Some(chunk))?.is_ready()); // This allows when chunk is `None`, or `Some([])`. } else if chunk.as_ref().len() == 0 { // ok } else { warn!("unexpected chunk when body cannot write"); } } else { return Ok(Async::NotReady); } } } fn poll_flush(&mut self) -> Poll<(), ::Error> { self.conn.flush().map_err(|err| { debug!("error writing: {}", err); err.into() }) } fn close(&mut self) { self.is_closing = true; self.conn.close_read(); self.conn.close_write(); } fn is_done(&self) -> bool { if self.is_closing { return true; } let read_done = self.conn.is_read_closed(); if !T::should_read_first() && read_done { // a client that cannot read may was well be done. true } else { let write_done = self.conn.is_write_closed() || (!self.dispatch.should_poll() && self.body_rx.is_none()); read_done && write_done } } } impl Future for Dispatcher where D: Dispatch, PollBody=Bs, RecvItem=MessageHead>, I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, K: KeepAlive, Bs: Stream, { type Item = (); type Error = ::Error; #[inline] fn poll(&mut self) -> Poll { trace!("Dispatcher::poll"); self.poll2().or_else(|e| { // An error means we're shutting down either way. // We just try to give the error to the user, // and close the connection with an Ok. If we // cannot give it to the user, then return the Err. self.dispatch.recv_msg(Err(e)).map(Async::Ready) }) } } // ===== impl Server ===== impl Server where S: Service { pub fn new(service: S) -> Server { Server { in_flight: None, service: service, } } } impl Dispatch for Server where S: Service, Error=::Error>, Bs: Stream, Bs::Item: AsRef<[u8]>, { type PollItem = MessageHead; type PollBody = Bs; type RecvItem = RequestHead; fn poll_msg(&mut self) -> Poll)>, ::Error> { if let Some(mut fut) = self.in_flight.take() { let resp = match fut.poll()? { Async::Ready(res) => res, Async::NotReady => { self.in_flight = Some(fut); return Ok(Async::NotReady); } }; let (head, body) = ::proto::response::split(resp); Ok(Async::Ready(Some((head.into(), body)))) } else { unreachable!("poll_msg shouldn't be called if no inflight"); } } fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Option)>) -> ::Result<()> { let (msg, body) = msg?; let req = ::proto::request::from_wire(None, msg, body); self.in_flight = Some(self.service.call(req)); Ok(()) } fn poll_ready(&mut self) -> Poll<(), ()> { if self.in_flight.is_some() { Ok(Async::NotReady) } else { Ok(Async::Ready(())) } } fn should_poll(&self) -> bool { self.in_flight.is_some() } } // ===== impl Client ===== impl Client { pub fn new(rx: ClientRx) -> Client { Client { callback: None, rx: rx, } } } impl Dispatch for Client where B: Stream, B::Item: AsRef<[u8]>, { type PollItem = RequestHead; type PollBody = B; type RecvItem = ResponseHead; fn poll_msg(&mut self) -> Poll)>, ::Error> { match self.rx.poll() { Ok(Async::Ready(Some(((head, body), mut cb)))) => { // check that future hasn't been canceled already match cb.poll_cancel().expect("poll_cancel cannot error") { Async::Ready(()) => { trace!("request canceled"); Ok(Async::Ready(None)) }, Async::NotReady => { self.callback = Some(cb); Ok(Async::Ready(Some((head, body)))) } } }, Ok(Async::Ready(None)) => { trace!("client tx closed"); // user has dropped sender handle Ok(Async::Ready(None)) }, Ok(Async::NotReady) => return Ok(Async::NotReady), Err(_) => unreachable!("receiver cannot error"), } } fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Option)>) -> ::Result<()> { match msg { Ok((msg, body)) => { if let Some(cb) = self.callback.take() { let res = ::proto::response::from_wire(msg, body); let _ = cb.send(Ok(res)); Ok(()) } else { Err(::Error::Io(io::Error::new(io::ErrorKind::InvalidData, "response received without matching request"))) } }, Err(err) => { if let Some(cb) = self.callback.take() { let _ = cb.send(Err(err)); Ok(()) } else if let Ok(Async::Ready(Some((_, cb)))) = self.rx.poll() { // in this case, the message was never even started, so it's safe to tell // the user that the request was completely canceled let _ = cb.send(Err(::Error::new_canceled(Some(err)))); Ok(()) } else { Err(err) } } } } fn poll_ready(&mut self) -> Poll<(), ()> { match self.callback { Some(ref mut cb) => match cb.poll_cancel() { Ok(Async::Ready(())) => { trace!("callback receiver has dropped"); Err(()) }, Ok(Async::NotReady) => Ok(Async::Ready(())), Err(_) => unreachable!("oneshot poll_cancel cannot error"), }, None => Err(()), } } fn should_poll(&self) -> bool { self.callback.is_none() } } #[cfg(test)] mod tests { extern crate pretty_env_logger; use super::*; use mock::AsyncIo; use proto::ClientTransaction; #[test] fn client_read_bytes_before_writing_request() { let _ = pretty_env_logger::try_init(); ::futures::lazy(|| { let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 100); let (tx, rx) = ::client::dispatch::channel(); let conn = Conn::<_, ::Chunk, ClientTransaction>::new(io, Default::default()); let mut dispatcher = Dispatcher::new(Client::new(rx), conn); let req = RequestHead { version: ::HttpVersion::Http11, subject: ::proto::RequestLine::default(), headers: Default::default(), }; let res_rx = tx.send((req, None::<::Body>)).unwrap(); let a1 = dispatcher.poll().expect("error should be sent on channel"); assert!(a1.is_ready(), "dispatcher should be closed"); let err = res_rx.wait() .expect("callback poll") .expect_err("callback response"); match err { ::Error::Cancel(_) => (), other => panic!("expected Cancel(_), got {:?}", other), } Ok::<(), ()>(()) }).wait().unwrap(); } }