More restructuring

This commit is contained in:
Carl Lerche
2017-08-02 14:48:10 -07:00
parent 77681674e2
commit 9f9bf85168
7 changed files with 105 additions and 89 deletions

View File

@@ -1,4 +1,5 @@
use {frame, proto, Peer, ConnectionError, StreamId}; use {frame, ConnectionError, StreamId};
use proto::{self, Connection};
use http; use http;
use futures::{Future, Poll, Sink, AsyncSink}; use futures::{Future, Poll, Sink, AsyncSink};
@@ -10,55 +11,63 @@ use std::fmt;
/// In progress H2 connection binding /// In progress H2 connection binding
pub struct Handshake<T, B: IntoBuf = Bytes> { pub struct Handshake<T, B: IntoBuf = Bytes> {
// TODO: unbox // TODO: unbox
inner: Box<Future<Item = Connection<T, B>, Error = ConnectionError>>, inner: Box<Future<Item = Client<T, B>, Error = ConnectionError>>,
} }
#[derive(Debug)]
struct Peer;
/// Marker type indicating a client peer /// Marker type indicating a client peer
#[derive(Debug)] pub struct Client<T, B: IntoBuf> {
pub struct Client; connection: Connection<T, Peer, B>,
pub type Connection<T, B = Bytes> = super::Connection<T, Client, B>;
pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
where T: AsyncRead + AsyncWrite + 'static,
{
handshake2(io)
} }
/// Bind an H2 client connection. impl<T> Client<T, Bytes>
/// where T: AsyncRead + AsyncWrite + 'static,
/// Returns a future which resolves to the connection value once the H2 {
/// handshake has been completed. pub fn handshake(io: T) -> Handshake<T, Bytes> {
pub fn handshake2<T, B>(io: T) -> Handshake<T, B> Client::handshake2(io)
}
}
impl<T, B> Client<T, B>
// TODO: Get rid of 'static
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
B: IntoBuf + 'static, B: IntoBuf + 'static,
{ {
use tokio_io::io; /// Bind an H2 client connection.
///
/// Returns a future which resolves to the connection value once the H2
/// handshake has been completed.
pub fn handshake2(io: T) -> Handshake<T, B> {
use tokio_io::io;
debug!("binding client connection"); debug!("binding client connection");
let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")
.map_err(ConnectionError::from) .map_err(ConnectionError::from)
.and_then(|(io, _)| { .and_then(|(io, _)| {
debug!("client connection bound"); debug!("client connection bound");
let mut framed_write = proto::framed_write(io); let mut framed_write = proto::framed_write(io);
let settings = frame::Settings::default(); let settings = frame::Settings::default();
// Send initial settings frame // Send initial settings frame
match framed_write.start_send(settings.into()) { match framed_write.start_send(settings.into()) {
Ok(AsyncSink::Ready) => { Ok(AsyncSink::Ready) => {
Ok(proto::from_framed_write(framed_write)) let conn = proto::from_framed_write(framed_write);
Ok(Client { connection: conn })
}
Ok(_) => unreachable!(),
Err(e) => Err(ConnectionError::from(e)),
} }
Ok(_) => unreachable!(), });
Err(e) => Err(ConnectionError::from(e)),
}
});
Handshake { inner: Box::new(handshake) } Handshake { inner: Box::new(handshake) }
}
} }
impl Peer for Client { impl proto::Peer for Peer {
type Send = http::request::Head; type Send = http::request::Head;
type Poll = http::response::Head; type Poll = http::response::Head;
@@ -98,7 +107,7 @@ impl Peer for Client {
} }
impl<T, B: IntoBuf> Future for Handshake<T, B> { impl<T, B: IntoBuf> Future for Handshake<T, B> {
type Item = Connection<T, B>; type Item = Client<T, B>;
type Error = ConnectionError; type Error = ConnectionError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@@ -109,8 +118,21 @@ impl<T, B: IntoBuf> Future for Handshake<T, B> {
impl<T, B> fmt::Debug for Handshake<T, B> impl<T, B> fmt::Debug for Handshake<T, B>
where T: fmt::Debug, where T: fmt::Debug,
B: fmt::Debug + IntoBuf, B: fmt::Debug + IntoBuf,
B::Buf: fmt::Debug + IntoBuf,
{ {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "client::Handshake") write!(fmt, "client::Handshake")
} }
} }
impl<T, B> fmt::Debug for Client<T, B>
where T: fmt::Debug,
B: fmt::Debug + IntoBuf,
B::Buf: fmt::Debug + IntoBuf,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Client")
.field("connection", &self.connection)
.finish()
}
}

View File

@@ -30,11 +30,10 @@ pub mod error;
mod hpack; mod hpack;
mod proto; mod proto;
mod frame; mod frame;
pub mod server; // pub mod server;
pub use error::{ConnectionError, Reason}; pub use error::{ConnectionError, Reason};
pub use frame::StreamId; pub use frame::StreamId;
pub use proto::Connection;
use bytes::Bytes; use bytes::Bytes;
@@ -68,23 +67,3 @@ pub enum Frame<T, B = Bytes> {
error: Reason, error: Reason,
}, },
} }
/// Either a Client or a Server
pub trait Peer {
/// Message type sent into the transport
type Send;
/// Message type polled from the transport
type Poll;
fn is_server() -> bool;
#[doc(hidden)]
fn convert_send_message(
id: StreamId,
headers: Self::Send,
end_of_stream: bool) -> frame::Headers;
#[doc(hidden)]
fn convert_poll_message(headers: frame::Headers) -> Self::Poll;
}

View File

@@ -1,8 +1,6 @@
use {ConnectionError, Frame, Peer}; use {ConnectionError, Frame};
use HeaderMap; use HeaderMap;
use frame::{self, StreamId}; use frame::{self, StreamId};
use client::Client;
use server::Server;
use proto::*; use proto::*;
@@ -26,34 +24,29 @@ pub struct Connection<T, P, B: IntoBuf = Bytes> {
_phantom: PhantomData<P>, _phantom: PhantomData<P>,
} }
pub fn new<T, P, B>(codec: Codec<T, B::Buf>)
-> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
// TODO: Actually configure
let streams = Streams::new(streams::Config {
max_remote_initiated: None,
init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
max_local_initiated: None,
init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
});
Connection {
codec: codec,
ping_pong: PingPong::new(),
settings: Settings::new(),
streams: streams,
_phantom: PhantomData,
}
}
impl<T, P, B> Connection<T, P, B> impl<T, P, B> Connection<T, P, B>
where T: AsyncRead + AsyncWrite, where T: AsyncRead + AsyncWrite,
P: Peer, P: Peer,
B: IntoBuf, B: IntoBuf,
{ {
pub fn new(codec: Codec<T, B::Buf>) -> Connection<T, P, B> {
// TODO: Actually configure
let streams = Streams::new(streams::Config {
max_remote_initiated: None,
init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
max_local_initiated: None,
init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
});
Connection {
codec: codec,
ping_pong: PingPong::new(),
settings: Settings::new(),
streams: streams,
_phantom: PhantomData,
}
}
/// Polls for the next update to a remote flow control window. /// Polls for the next update to a remote flow control window.
pub fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> { pub fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
self.streams.poll_window_update() self.streams.poll_window_update()
@@ -250,6 +243,7 @@ impl<T, P, B> Connection<T, P, B>
} }
} }
/*
impl<T, B> Connection<T, Client, B> impl<T, B> Connection<T, Client, B>
where T: AsyncRead + AsyncWrite, where T: AsyncRead + AsyncWrite,
B: IntoBuf, B: IntoBuf,
@@ -296,6 +290,7 @@ impl<T, B> Connection<T, Server, B>
}) })
} }
} }
*/
impl<T, P, B> Stream for Connection<T, P, B> impl<T, P, B> Stream for Connection<T, P, B>
where T: AsyncRead + AsyncWrite, where T: AsyncRead + AsyncWrite,

View File

@@ -13,15 +13,35 @@ use self::ping_pong::PingPong;
use self::settings::Settings; use self::settings::Settings;
use self::streams::Streams; use self::streams::Streams;
use {StreamId, Peer}; use StreamId;
use error::Reason; use error::Reason;
use frame::Frame; use frame::{self, Frame};
use futures::*; use futures::*;
use bytes::{Buf, IntoBuf}; use bytes::{Buf, IntoBuf};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited; use tokio_io::codec::length_delimited;
/// Either a Client or a Server
pub trait Peer {
/// Message type sent into the transport
type Send;
/// Message type polled from the transport
type Poll;
fn is_server() -> bool;
#[doc(hidden)]
fn convert_send_message(
id: StreamId,
headers: Self::Send,
end_of_stream: bool) -> frame::Headers;
#[doc(hidden)]
fn convert_poll_message(headers: frame::Headers) -> Self::Poll;
}
pub type PingPayload = [u8; 8]; pub type PingPayload = [u8; 8];
pub type WindowSize = u32; pub type WindowSize = u32;
@@ -69,7 +89,7 @@ pub fn from_framed_write<T, P, B>(framed_write: FramedWrite<T, B::Buf>)
let codec = FramedRead::new(framed); let codec = FramedRead::new(framed);
connection::new(codec) Connection::new(codec)
} }
impl WindowUpdate { impl WindowUpdate {

View File

@@ -14,7 +14,7 @@ use self::send::Send;
use self::state::State; use self::state::State;
use self::store::{Store, Entry}; use self::store::{Store, Entry};
use {frame, Peer, StreamId, ConnectionError}; use {frame, StreamId, ConnectionError};
use proto::*; use proto::*;
use error::Reason::*; use error::Reason::*;
use error::User::*; use error::User::*;

View File

@@ -1,4 +1,4 @@
use {frame, Peer, ConnectionError}; use {frame, ConnectionError};
use proto::*; use proto::*;
use super::*; use super::*;

View File

@@ -1,4 +1,4 @@
use {frame, Peer, ConnectionError}; use {frame, ConnectionError};
use proto::*; use proto::*;
use super::*; use super::*;