Unbox server handshake future (#52)

Server-side version of #42. I've rewritten `server::Handshake` as a hand-rolled `Future` rather than as a `Box<Future>`. In addition to removing a `Box`, this also means that the `'static` lifetime bounds on the type parameters `T` and `B` can be removed.

The type of the server handshake future is somewhat more complex than the client-side handshake future. Note also that I've had to re-export `proto::streams::Prioritized` as `pub(crate)` from `proto`, as it appears in the type of the handshake future.

I've ran the tests against this branch and everything passes. Since no new functionality was added, I haven't added any additional tests.

This also fixes #158 - I had accidentally committed a Darwin h2spec executable and that's what was breaking CI.
This commit is contained in:
Eliza Weisman
2017-10-19 12:21:13 -07:00
committed by GitHub
parent 58c55564e2
commit 1e126aa752
2 changed files with 125 additions and 28 deletions

View File

@@ -9,12 +9,11 @@ pub(crate) use self::connection::Connection;
pub(crate) use self::error::Error;
pub(crate) use self::peer::Peer;
pub(crate) use self::streams::{Key as StreamKey, StreamRef, Streams};
pub(crate) use self::streams::Prioritized;
use codec::Codec;
use self::ping_pong::PingPong;
use self::settings::Settings;
use self::streams::Prioritized;
use frame::{self, Frame};

View File

@@ -1,18 +1,19 @@
use codec::{Codec, RecvError};
use frame::{self, Reason, Settings, StreamId};
use proto::{self, Connection, WindowSize};
use proto::{self, Connection, WindowSize, Prioritized};
use bytes::{Buf, Bytes, IntoBuf};
use futures::{self, Async, Future, Poll};
use http::{HeaderMap, Request, Response};
use tokio_io::{AsyncRead, AsyncWrite};
use std::fmt;
use std::{convert, fmt, mem};
/// In progress H2 connection binding
pub struct Handshake<T, B: IntoBuf = Bytes> {
// TODO: unbox
inner: Box<Future<Item = Server<T, B>, Error = ::Error>>,
pub struct Handshake<T: AsyncRead + AsyncWrite, B: IntoBuf = Bytes> {
/// SETTINGS frame that will be sent once the connection is established.
settings: Settings,
/// The current state of the handshake.
state: Handshaking<T, B>
}
/// Marker type indicating a client peer
@@ -50,6 +51,16 @@ pub struct Send<T> {
eos: bool,
}
/// Stages of an in-progress handshake.
enum Handshaking<T, B: IntoBuf> {
/// State 1. Server is flushing pending SETTINGS frame.
Flushing(Flush<T, Prioritized<B::Buf>>),
/// State 2. Server is waiting for the client preface.
ReadingPreface(ReadPreface<T, Prioritized<B::Buf>>),
/// Dummy state for `mem::replace`.
Empty,
}
/// Flush a Sink
struct Flush<T, B> {
codec: Option<Codec<T, B>>,
@@ -94,31 +105,22 @@ where
B: IntoBuf + 'static,
{
fn handshake2(io: T, settings: Settings) -> Handshake<T, B> {
// Create the codec
// Create the codec.
let mut codec = Codec::new(io);
if let Some(max) = settings.max_frame_size() {
codec.set_max_recv_frame_size(max as usize);
}
// Send initial settings frame
// Send initial settings frame.
codec
.buffer(settings.clone().into())
.expect("invalid SETTINGS frame");
// Flush pending settings frame and then wait for the client preface
let handshake = Flush::new(codec)
.and_then(ReadPreface::new)
.map(move |codec| {
let connection = Connection::new(codec, &settings, 2.into());
Server {
connection,
}
});
// Create the handshake future.
let state = Handshaking::from(codec);
Handshake {
inner: Box::new(handshake),
}
Handshake { settings, state }
}
/// Sets the target window size for the whole connection.
@@ -132,7 +134,7 @@ where
/// Returns `Ready` when the underlying connection has closed.
pub fn poll_close(&mut self) -> Poll<(), ::Error> {
self.connection.poll().map_err(Into::into)
}
}
}
impl<T, B> futures::Stream for Server<T, B>
@@ -472,19 +474,64 @@ where
// ===== impl Handshake =====
impl<T, B: IntoBuf> Future for Handshake<T, B> {
impl<T, B: IntoBuf> Future for Handshake<T, B>
where T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
type Item = Server<T, B>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
trace!("Handshake::poll(); state={:?};", self.state);
use server::Handshaking::*;
self.state = if let Flushing(ref mut flush) = self.state {
// We're currently flushing a pending SETTINGS frame. Poll the
// flush future, and, if it's completed, advance our state to wait
// for the client preface.
let codec = match flush.poll()? {
Async::NotReady => {
trace!("Handshake::poll(); flush.poll()=NotReady");
return Ok(Async::NotReady);
},
Async::Ready(flushed) => {
trace!("Handshake::poll(); flush.poll()=Ready");
flushed
}
};
Handshaking::from(ReadPreface::new(codec))
} else {
// Otherwise, we haven't actually advanced the state, but we have
// to replace it with itself, because we have to return a value.
// (note that the assignment to `self.state` has to be outside of
// the `if let` block above in order to placate the borrow checker).
mem::replace(&mut self.state, Handshaking::Empty)
};
let poll = if let ReadingPreface(ref mut read) = self.state {
// We're now waiting for the client preface. Poll the `ReadPreface`
// future. If it has completed, we will create a `Server` handle
// for the connection.
read.poll()
// Actually creating the `Connection` has to occur outside of this
// `if let` block, because we've borrowed `self` mutably in order
// to poll the state and won't be able to borrow the SETTINGS frame
// as well until we release the borrow for `poll()`.
} else {
unreachable!("Handshake::poll() state was not advanced completely!")
};
let server = poll?.map(|codec| {
let connection =
Connection::new(codec, &self.settings, 2.into());
trace!("Handshake::poll(); connection established!");
Server { connection }
});
Ok(server)
}
}
impl<T, B> fmt::Debug for Handshake<T, B>
where
T: fmt::Debug,
B: fmt::Debug + IntoBuf,
where T: AsyncRead + AsyncWrite + fmt::Debug,
B: fmt::Debug + IntoBuf,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "server::Handshake")
@@ -604,3 +651,54 @@ impl proto::Peer for Peer {
Ok(request)
}
}
// ===== impl Handshaking =====
impl<T, B> fmt::Debug for Handshaking<T, B>
where
B: IntoBuf
{
#[inline] fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
Handshaking::Flushing(_) =>
write!(f, "Handshaking::Flushing(_)"),
Handshaking::ReadingPreface(_) =>
write!(f, "Handshaking::ReadingPreface(_)"),
Handshaking::Empty =>
write!(f, "Handshaking::Empty"),
}
}
}
impl<T, B> convert::From<Flush<T, Prioritized<B::Buf>>> for Handshaking<T, B>
where
T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
#[inline] fn from(flush: Flush<T, Prioritized<B::Buf>>) -> Self {
Handshaking::Flushing(flush)
}
}
impl<T, B> convert::From<ReadPreface<T, Prioritized<B::Buf>>> for
Handshaking<T, B>
where
T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
#[inline] fn from(read: ReadPreface<T, Prioritized<B::Buf>>) -> Self {
Handshaking::ReadingPreface(read)
}
}
impl<T, B> convert::From<Codec<T, Prioritized<B::Buf>>> for Handshaking<T, B>
where
T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
#[inline] fn from(codec: Codec<T, Prioritized<B::Buf>>) -> Self {
Handshaking::from(Flush::new(codec))
}
}