From 1e126aa7527bc474ab97c522f990c78da4e4c816 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 19 Oct 2017 12:21:13 -0700 Subject: [PATCH] Unbox server handshake future (#52) Server-side version of #42. I've rewritten `server::Handshake` as a hand-rolled `Future` rather than as a `Box`. In addition to removing a `Box`, this also means that the `'static` lifetime bounds on the type parameters `T` and `B` can be removed. The type of the server handshake future is somewhat more complex than the client-side handshake future. Note also that I've had to re-export `proto::streams::Prioritized` as `pub(crate)` from `proto`, as it appears in the type of the handshake future. I've ran the tests against this branch and everything passes. Since no new functionality was added, I haven't added any additional tests. This also fixes #158 - I had accidentally committed a Darwin h2spec executable and that's what was breaking CI. --- src/proto/mod.rs | 3 +- src/server.rs | 150 +++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 125 insertions(+), 28 deletions(-) diff --git a/src/proto/mod.rs b/src/proto/mod.rs index cebd303..d946144 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -9,12 +9,11 @@ pub(crate) use self::connection::Connection; pub(crate) use self::error::Error; pub(crate) use self::peer::Peer; pub(crate) use self::streams::{Key as StreamKey, StreamRef, Streams}; - +pub(crate) use self::streams::Prioritized; use codec::Codec; use self::ping_pong::PingPong; use self::settings::Settings; -use self::streams::Prioritized; use frame::{self, Frame}; diff --git a/src/server.rs b/src/server.rs index db7903b..b554996 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,18 +1,19 @@ use codec::{Codec, RecvError}; use frame::{self, Reason, Settings, StreamId}; -use proto::{self, Connection, WindowSize}; +use proto::{self, Connection, WindowSize, Prioritized}; use bytes::{Buf, Bytes, IntoBuf}; use futures::{self, Async, Future, Poll}; use http::{HeaderMap, Request, Response}; use tokio_io::{AsyncRead, AsyncWrite}; - -use std::fmt; +use std::{convert, fmt, mem}; /// In progress H2 connection binding -pub struct Handshake { - // TODO: unbox - inner: Box, Error = ::Error>>, +pub struct Handshake { + /// SETTINGS frame that will be sent once the connection is established. + settings: Settings, + /// The current state of the handshake. + state: Handshaking } /// Marker type indicating a client peer @@ -50,6 +51,16 @@ pub struct Send { eos: bool, } +/// Stages of an in-progress handshake. +enum Handshaking { + /// State 1. Server is flushing pending SETTINGS frame. + Flushing(Flush>), + /// State 2. Server is waiting for the client preface. + ReadingPreface(ReadPreface>), + /// Dummy state for `mem::replace`. + Empty, +} + /// Flush a Sink struct Flush { codec: Option>, @@ -94,31 +105,22 @@ where B: IntoBuf + 'static, { fn handshake2(io: T, settings: Settings) -> Handshake { - // Create the codec + // Create the codec. let mut codec = Codec::new(io); if let Some(max) = settings.max_frame_size() { codec.set_max_recv_frame_size(max as usize); } - // Send initial settings frame + // Send initial settings frame. codec .buffer(settings.clone().into()) .expect("invalid SETTINGS frame"); - // Flush pending settings frame and then wait for the client preface - let handshake = Flush::new(codec) - .and_then(ReadPreface::new) - .map(move |codec| { - let connection = Connection::new(codec, &settings, 2.into()); - Server { - connection, - } - }); + // Create the handshake future. + let state = Handshaking::from(codec); - Handshake { - inner: Box::new(handshake), - } + Handshake { settings, state } } /// Sets the target window size for the whole connection. @@ -132,7 +134,7 @@ where /// Returns `Ready` when the underlying connection has closed. pub fn poll_close(&mut self) -> Poll<(), ::Error> { self.connection.poll().map_err(Into::into) - } +} } impl futures::Stream for Server @@ -472,19 +474,64 @@ where // ===== impl Handshake ===== -impl Future for Handshake { +impl Future for Handshake + where T: AsyncRead + AsyncWrite, + B: IntoBuf, +{ type Item = Server; type Error = ::Error; fn poll(&mut self) -> Poll { - self.inner.poll() + trace!("Handshake::poll(); state={:?};", self.state); + use server::Handshaking::*; + + self.state = if let Flushing(ref mut flush) = self.state { + // We're currently flushing a pending SETTINGS frame. Poll the + // flush future, and, if it's completed, advance our state to wait + // for the client preface. + let codec = match flush.poll()? { + Async::NotReady => { + trace!("Handshake::poll(); flush.poll()=NotReady"); + return Ok(Async::NotReady); + }, + Async::Ready(flushed) => { + trace!("Handshake::poll(); flush.poll()=Ready"); + flushed + } + }; + Handshaking::from(ReadPreface::new(codec)) + } else { + // Otherwise, we haven't actually advanced the state, but we have + // to replace it with itself, because we have to return a value. + // (note that the assignment to `self.state` has to be outside of + // the `if let` block above in order to placate the borrow checker). + mem::replace(&mut self.state, Handshaking::Empty) + }; + let poll = if let ReadingPreface(ref mut read) = self.state { + // We're now waiting for the client preface. Poll the `ReadPreface` + // future. If it has completed, we will create a `Server` handle + // for the connection. + read.poll() + // Actually creating the `Connection` has to occur outside of this + // `if let` block, because we've borrowed `self` mutably in order + // to poll the state and won't be able to borrow the SETTINGS frame + // as well until we release the borrow for `poll()`. + } else { + unreachable!("Handshake::poll() state was not advanced completely!") + }; + let server = poll?.map(|codec| { + let connection = + Connection::new(codec, &self.settings, 2.into()); + trace!("Handshake::poll(); connection established!"); + Server { connection } + }); + Ok(server) } } impl fmt::Debug for Handshake -where - T: fmt::Debug, - B: fmt::Debug + IntoBuf, + where T: AsyncRead + AsyncWrite + fmt::Debug, + B: fmt::Debug + IntoBuf, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "server::Handshake") @@ -604,3 +651,54 @@ impl proto::Peer for Peer { Ok(request) } } + + + +// ===== impl Handshaking ===== +impl fmt::Debug for Handshaking +where + B: IntoBuf +{ + #[inline] fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + match *self { + Handshaking::Flushing(_) => + write!(f, "Handshaking::Flushing(_)"), + Handshaking::ReadingPreface(_) => + write!(f, "Handshaking::ReadingPreface(_)"), + Handshaking::Empty => + write!(f, "Handshaking::Empty"), + } + + } +} + +impl convert::From>> for Handshaking +where + T: AsyncRead + AsyncWrite, + B: IntoBuf, +{ + #[inline] fn from(flush: Flush>) -> Self { + Handshaking::Flushing(flush) + } +} + +impl convert::From>> for + Handshaking +where + T: AsyncRead + AsyncWrite, + B: IntoBuf, +{ + #[inline] fn from(read: ReadPreface>) -> Self { + Handshaking::ReadingPreface(read) + } +} + +impl convert::From>> for Handshaking +where + T: AsyncRead + AsyncWrite, + B: IntoBuf, +{ + #[inline] fn from(codec: Codec>) -> Self { + Handshaking::from(Flush::new(codec)) + } +}