Unbox client handshake future (#42)
This commit is contained in:
committed by
Carl Lerche
parent
2452cc4423
commit
a466646e19
@@ -4,20 +4,27 @@ use proto::{self, Connection, WindowSize, ProtoError};
|
|||||||
use error::Reason::*;
|
use error::Reason::*;
|
||||||
|
|
||||||
use http::{Request, Response};
|
use http::{Request, Response};
|
||||||
use futures::{Future, Poll, Sink, Async, AsyncSink};
|
use futures::{Future, Poll, Sink, Async, AsyncSink, AndThen, MapErr};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
use tokio_io::io::WriteAll;
|
||||||
use bytes::{Bytes, IntoBuf};
|
use bytes::{Bytes, IntoBuf};
|
||||||
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
use std::io::Error as IoError;
|
||||||
|
|
||||||
/// In progress H2 connection binding
|
/// In progress H2 connection binding
|
||||||
pub struct Handshake<T, B: IntoBuf = Bytes> {
|
pub struct Handshake<T: AsyncRead + AsyncWrite, B: IntoBuf = Bytes> {
|
||||||
// TODO: unbox
|
inner:
|
||||||
inner: Box<Future<Item = Client<T, B>, Error = ConnectionError>>,
|
AndThen<
|
||||||
|
MapErr<WriteAll<T, &'static [u8]>, fn(IoError) -> ConnectionError>,
|
||||||
|
Result<Client<T, B>, ConnectionError>,
|
||||||
|
fn((T, &'static [u8])) -> Result<Client<T, B>, ConnectionError>
|
||||||
|
>
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Marker type indicating a client peer
|
/// Marker type indicating a client peer
|
||||||
pub struct Client<T, B: IntoBuf> {
|
pub struct Client<T: AsyncRead + AsyncWrite, B: IntoBuf> {
|
||||||
connection: Connection<T, Peer, B>,
|
connection: Connection<T, Peer, B>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -35,7 +42,7 @@ pub struct Body<B: IntoBuf> {
|
|||||||
pub(crate) struct Peer;
|
pub(crate) struct Peer;
|
||||||
|
|
||||||
impl<T> Client<T, Bytes>
|
impl<T> Client<T, Bytes>
|
||||||
where T: AsyncRead + AsyncWrite + 'static,
|
where T: AsyncRead + AsyncWrite,
|
||||||
{
|
{
|
||||||
pub fn handshake(io: T) -> Handshake<T, Bytes> {
|
pub fn handshake(io: T) -> Handshake<T, Bytes> {
|
||||||
Client::handshake2(io)
|
Client::handshake2(io)
|
||||||
@@ -43,9 +50,8 @@ impl<T> Client<T, Bytes>
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T, B> Client<T, B>
|
impl<T, B> Client<T, B>
|
||||||
// TODO: Get rid of 'static
|
where T: AsyncRead + AsyncWrite,
|
||||||
where T: AsyncRead + AsyncWrite + 'static,
|
B: IntoBuf
|
||||||
B: IntoBuf + 'static,
|
|
||||||
{
|
{
|
||||||
/// Bind an H2 client connection.
|
/// Bind an H2 client connection.
|
||||||
///
|
///
|
||||||
@@ -56,9 +62,9 @@ impl<T, B> Client<T, B>
|
|||||||
|
|
||||||
debug!("binding client connection");
|
debug!("binding client connection");
|
||||||
|
|
||||||
let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")
|
let bind: fn((T, &'static [u8]))
|
||||||
.map_err(ConnectionError::from)
|
-> Result<Client<T, B>, ConnectionError> =
|
||||||
.and_then(|(io, _)| {
|
|(io, _)| {
|
||||||
debug!("client connection bound");
|
debug!("client connection bound");
|
||||||
|
|
||||||
let mut framed_write = proto::framed_write(io);
|
let mut framed_write = proto::framed_write(io);
|
||||||
@@ -73,9 +79,16 @@ impl<T, B> Client<T, B>
|
|||||||
Ok(_) => unreachable!(),
|
Ok(_) => unreachable!(),
|
||||||
Err(e) => Err(ConnectionError::from(e)),
|
Err(e) => Err(ConnectionError::from(e)),
|
||||||
}
|
}
|
||||||
});
|
};
|
||||||
|
|
||||||
Handshake { inner: Box::new(handshake) }
|
let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
|
||||||
|
let handshake = io::write_all(io, msg)
|
||||||
|
.map_err(ConnectionError::from as
|
||||||
|
fn(IoError) -> ConnectionError
|
||||||
|
)
|
||||||
|
.and_then(bind);
|
||||||
|
|
||||||
|
Handshake { inner: handshake }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns `Ready` when the connection can initialize a new HTTP 2.0
|
/// Returns `Ready` when the connection can initialize a new HTTP 2.0
|
||||||
@@ -97,8 +110,8 @@ impl<T, B> Client<T, B>
|
|||||||
|
|
||||||
impl<T, B> Future for Client<T, B>
|
impl<T, B> Future for Client<T, B>
|
||||||
// TODO: Get rid of 'static
|
// TODO: Get rid of 'static
|
||||||
where T: AsyncRead + AsyncWrite + 'static,
|
where T: AsyncRead + AsyncWrite,
|
||||||
B: IntoBuf + 'static,
|
B: IntoBuf,
|
||||||
{
|
{
|
||||||
type Item = ();
|
type Item = ();
|
||||||
type Error = ConnectionError;
|
type Error = ConnectionError;
|
||||||
@@ -109,7 +122,8 @@ impl<T, B> Future for Client<T, B>
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T, B> fmt::Debug for Client<T, B>
|
impl<T, B> fmt::Debug for Client<T, B>
|
||||||
where T: fmt::Debug,
|
where T: AsyncRead + AsyncWrite,
|
||||||
|
T: fmt::Debug,
|
||||||
B: fmt::Debug + IntoBuf,
|
B: fmt::Debug + IntoBuf,
|
||||||
B::Buf: fmt::Debug,
|
B::Buf: fmt::Debug,
|
||||||
{
|
{
|
||||||
@@ -122,7 +136,8 @@ impl<T, B> fmt::Debug for Client<T, B>
|
|||||||
|
|
||||||
// ===== impl Handshake =====
|
// ===== impl Handshake =====
|
||||||
|
|
||||||
impl<T, B: IntoBuf> Future for Handshake<T, B> {
|
impl<T, B: IntoBuf> Future for Handshake<T, B>
|
||||||
|
where T: AsyncRead + AsyncWrite {
|
||||||
type Item = Client<T, B>;
|
type Item = Client<T, B>;
|
||||||
type Error = ConnectionError;
|
type Error = ConnectionError;
|
||||||
|
|
||||||
@@ -132,7 +147,8 @@ impl<T, B: IntoBuf> Future for Handshake<T, B> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T, B> fmt::Debug for Handshake<T, B>
|
impl<T, B> fmt::Debug for Handshake<T, B>
|
||||||
where T: fmt::Debug,
|
where T: AsyncRead + AsyncWrite,
|
||||||
|
T: fmt::Debug,
|
||||||
B: fmt::Debug + IntoBuf,
|
B: fmt::Debug + IntoBuf,
|
||||||
B::Buf: fmt::Debug + IntoBuf,
|
B::Buf: fmt::Debug + IntoBuf,
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user