use futures::{Async, Future, Poll, Stream}; use h2::Reason; use h2::server::{Builder, Connection, Handshake, SendResponse}; use tokio_io::{AsyncRead, AsyncWrite}; use ::body::Payload; use ::common::Exec; use ::service::Service; use super::{PipeToSendStream, SendBuf}; use ::{Body, Response}; pub(crate) struct Server where S: Service, B: Payload, { exec: Exec, service: S, state: State, } enum State where B: Payload, { Handshaking(Handshake>), Serving(Serving), } struct Serving where B: Payload, { conn: Connection>, } impl Server where T: AsyncRead + AsyncWrite, S: Service, S::Error: Into>, S::Future: Send + 'static, B: Payload, { pub(crate) fn new(io: T, service: S, exec: Exec) -> Server { let handshake = Builder::new() .handshake(io); Server { exec, state: State::Handshaking(handshake), service, } } pub fn graceful_shutdown(&mut self) { unimplemented!("h2 server graceful shutdown"); } } impl Future for Server where T: AsyncRead + AsyncWrite, S: Service, S::Error: Into>, S::Future: Send + 'static, B: Payload, { type Item = (); type Error = ::Error; fn poll(&mut self) -> Poll { loop { let next = match self.state { State::Handshaking(ref mut h) => { let conn = try_ready!(h.poll().map_err(::Error::new_h2)); State::Serving(Serving { conn: conn, }) }, State::Serving(ref mut srv) => { return srv.poll_server(&mut self.service, &self.exec); } }; self.state = next; } } } impl Serving where T: AsyncRead + AsyncWrite, B: Payload, { fn poll_server(&mut self, service: &mut S, exec: &Exec) -> Poll<(), ::Error> where S: Service< ReqBody=Body, ResBody=B, >, S::Error: Into>, S::Future: Send + 'static, { while let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) { trace!("incoming request"); let req = req.map(::Body::h2); let fut = H2Stream::new(service.call(req), respond); exec.execute(fut); } // no more incoming streams... trace!("incoming connection complete"); Ok(Async::Ready(())) } } struct H2Stream where B: Payload, { reply: SendResponse>, state: H2StreamState, } enum H2StreamState where B: Payload, { Service(F), Body(PipeToSendStream), } impl H2Stream where F: Future>, F::Error: Into>, B: Payload, { fn new(fut: F, respond: SendResponse>) -> H2Stream { H2Stream { reply: respond, state: H2StreamState::Service(fut), } } fn poll2(&mut self) -> Poll<(), ::Error> { loop { let next = match self.state { H2StreamState::Service(ref mut h) => { let res = try_ready!(h.poll().map_err(::Error::new_user_service)); let (head, body) = res.into_parts(); let mut res = ::http::Response::from_parts(head, ()); super::strip_connection_headers(res.headers_mut()); macro_rules! reply { ($eos:expr) => ({ match self.reply.send_response(res, $eos) { Ok(tx) => tx, Err(e) => { trace!("send response error: {}", e); self.reply.send_reset(Reason::INTERNAL_ERROR); return Err(::Error::new_h2(e)); } } }) } if !body.is_end_stream() { let body_tx = reply!(false); H2StreamState::Body(PipeToSendStream::new(body, body_tx)) } else { reply!(true); return Ok(Async::Ready(())); } }, H2StreamState::Body(ref mut pipe) => { return pipe.poll(); } }; self.state = next; } } } impl Future for H2Stream where F: Future>, F::Error: Into>, B: Payload, { type Item = (); type Error = (); fn poll(&mut self) -> Poll { self.poll2() .map_err(|e| debug!("stream error: {}", e)) } }