From 9f9bf85168e7060a152abdee5c1166b370603cd5 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 2 Aug 2017 14:48:10 -0700 Subject: [PATCH] More restructuring --- src/client.rs | 92 ++++++++++++++++++++++++--------------- src/lib.rs | 23 +--------- src/proto/connection.rs | 47 +++++++++----------- src/proto/mod.rs | 26 +++++++++-- src/proto/streams/mod.rs | 2 +- src/proto/streams/recv.rs | 2 +- src/proto/streams/send.rs | 2 +- 7 files changed, 105 insertions(+), 89 deletions(-) diff --git a/src/client.rs b/src/client.rs index a504087..4167be2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,5 @@ -use {frame, proto, Peer, ConnectionError, StreamId}; +use {frame, ConnectionError, StreamId}; +use proto::{self, Connection}; use http; use futures::{Future, Poll, Sink, AsyncSink}; @@ -10,55 +11,63 @@ use std::fmt; /// In progress H2 connection binding pub struct Handshake { // TODO: unbox - inner: Box, Error = ConnectionError>>, + inner: Box, Error = ConnectionError>>, } +#[derive(Debug)] +struct Peer; + /// Marker type indicating a client peer -#[derive(Debug)] -pub struct Client; - -pub type Connection = super::Connection; - -pub fn handshake(io: T) -> Handshake - where T: AsyncRead + AsyncWrite + 'static, -{ - handshake2(io) +pub struct Client { + connection: Connection, } -/// Bind an H2 client connection. -/// -/// Returns a future which resolves to the connection value once the H2 -/// handshake has been completed. -pub fn handshake2(io: T) -> Handshake +impl Client + where T: AsyncRead + AsyncWrite + 'static, +{ + pub fn handshake(io: T) -> Handshake { + Client::handshake2(io) + } +} + +impl Client + // TODO: Get rid of 'static where T: AsyncRead + AsyncWrite + 'static, B: IntoBuf + 'static, { - use tokio_io::io; + /// Bind an H2 client connection. + /// + /// Returns a future which resolves to the connection value once the H2 + /// handshake has been completed. + pub fn handshake2(io: T) -> Handshake { + use tokio_io::io; - debug!("binding client connection"); + debug!("binding client connection"); - let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") - .map_err(ConnectionError::from) - .and_then(|(io, _)| { - debug!("client connection bound"); + let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") + .map_err(ConnectionError::from) + .and_then(|(io, _)| { + debug!("client connection bound"); - let mut framed_write = proto::framed_write(io); - let settings = frame::Settings::default(); + let mut framed_write = proto::framed_write(io); + let settings = frame::Settings::default(); - // Send initial settings frame - match framed_write.start_send(settings.into()) { - Ok(AsyncSink::Ready) => { - Ok(proto::from_framed_write(framed_write)) + // Send initial settings frame + match framed_write.start_send(settings.into()) { + Ok(AsyncSink::Ready) => { + let conn = proto::from_framed_write(framed_write); + Ok(Client { connection: conn }) + } + Ok(_) => unreachable!(), + Err(e) => Err(ConnectionError::from(e)), } - Ok(_) => unreachable!(), - Err(e) => Err(ConnectionError::from(e)), - } - }); + }); - Handshake { inner: Box::new(handshake) } + Handshake { inner: Box::new(handshake) } + } } -impl Peer for Client { +impl proto::Peer for Peer { type Send = http::request::Head; type Poll = http::response::Head; @@ -98,7 +107,7 @@ impl Peer for Client { } impl Future for Handshake { - type Item = Connection; + type Item = Client; type Error = ConnectionError; fn poll(&mut self) -> Poll { @@ -109,8 +118,21 @@ impl Future for Handshake { impl fmt::Debug for Handshake where T: fmt::Debug, B: fmt::Debug + IntoBuf, + B::Buf: fmt::Debug + IntoBuf, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "client::Handshake") } } + +impl fmt::Debug for Client + where T: fmt::Debug, + B: fmt::Debug + IntoBuf, + B::Buf: fmt::Debug + IntoBuf, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Client") + .field("connection", &self.connection) + .finish() + } +} diff --git a/src/lib.rs b/src/lib.rs index b4fdf4d..1eee29b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,11 +30,10 @@ pub mod error; mod hpack; mod proto; mod frame; -pub mod server; +// pub mod server; pub use error::{ConnectionError, Reason}; pub use frame::StreamId; -pub use proto::Connection; use bytes::Bytes; @@ -68,23 +67,3 @@ pub enum Frame { error: Reason, }, } - -/// Either a Client or a Server -pub trait Peer { - /// Message type sent into the transport - type Send; - - /// Message type polled from the transport - type Poll; - - fn is_server() -> bool; - - #[doc(hidden)] - fn convert_send_message( - id: StreamId, - headers: Self::Send, - end_of_stream: bool) -> frame::Headers; - - #[doc(hidden)] - fn convert_poll_message(headers: frame::Headers) -> Self::Poll; -} diff --git a/src/proto/connection.rs b/src/proto/connection.rs index b9f70b2..234432b 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,8 +1,6 @@ -use {ConnectionError, Frame, Peer}; +use {ConnectionError, Frame}; use HeaderMap; use frame::{self, StreamId}; -use client::Client; -use server::Server; use proto::*; @@ -26,34 +24,29 @@ pub struct Connection { _phantom: PhantomData

, } -pub fn new(codec: Codec) - -> Connection - where T: AsyncRead + AsyncWrite, - P: Peer, - B: IntoBuf, -{ - // TODO: Actually configure - let streams = Streams::new(streams::Config { - max_remote_initiated: None, - init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, - max_local_initiated: None, - init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, - }); - - Connection { - codec: codec, - ping_pong: PingPong::new(), - settings: Settings::new(), - streams: streams, - _phantom: PhantomData, - } -} - impl Connection where T: AsyncRead + AsyncWrite, P: Peer, B: IntoBuf, { + pub fn new(codec: Codec) -> Connection { + // TODO: Actually configure + let streams = Streams::new(streams::Config { + max_remote_initiated: None, + init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, + max_local_initiated: None, + init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, + }); + + Connection { + codec: codec, + ping_pong: PingPong::new(), + settings: Settings::new(), + streams: streams, + _phantom: PhantomData, + } + } + /// Polls for the next update to a remote flow control window. pub fn poll_window_update(&mut self) -> Poll { self.streams.poll_window_update() @@ -250,6 +243,7 @@ impl Connection } } +/* impl Connection where T: AsyncRead + AsyncWrite, B: IntoBuf, @@ -296,6 +290,7 @@ impl Connection }) } } +*/ impl Stream for Connection where T: AsyncRead + AsyncWrite, diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 30f2c6c..2df1eb5 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -13,15 +13,35 @@ use self::ping_pong::PingPong; use self::settings::Settings; use self::streams::Streams; -use {StreamId, Peer}; +use StreamId; use error::Reason; -use frame::Frame; +use frame::{self, Frame}; use futures::*; use bytes::{Buf, IntoBuf}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; +/// Either a Client or a Server +pub trait Peer { + /// Message type sent into the transport + type Send; + + /// Message type polled from the transport + type Poll; + + fn is_server() -> bool; + + #[doc(hidden)] + fn convert_send_message( + id: StreamId, + headers: Self::Send, + end_of_stream: bool) -> frame::Headers; + + #[doc(hidden)] + fn convert_poll_message(headers: frame::Headers) -> Self::Poll; +} + pub type PingPayload = [u8; 8]; pub type WindowSize = u32; @@ -69,7 +89,7 @@ pub fn from_framed_write(framed_write: FramedWrite) let codec = FramedRead::new(framed); - connection::new(codec) + Connection::new(codec) } impl WindowUpdate { diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 27bf157..3cbc39b 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -14,7 +14,7 @@ use self::send::Send; use self::state::State; use self::store::{Store, Entry}; -use {frame, Peer, StreamId, ConnectionError}; +use {frame, StreamId, ConnectionError}; use proto::*; use error::Reason::*; use error::User::*; diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 8dd6074..ce12f02 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,4 +1,4 @@ -use {frame, Peer, ConnectionError}; +use {frame, ConnectionError}; use proto::*; use super::*; diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 875fa06..b36dc32 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -1,4 +1,4 @@ -use {frame, Peer, ConnectionError}; +use {frame, ConnectionError}; use proto::*; use super::*;