From ed472f109ca1ed3a49750c3bd3ba773fd3f463d0 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 11 Sep 2017 14:25:32 -0700 Subject: [PATCH] add client::Builder to configure Clients --- src/client.rs | 114 +++++++++++++++++++++++++--------------- src/frame/settings.rs | 4 ++ src/proto/connection.rs | 6 ++- src/server.rs | 7 ++- tests/stream_states.rs | 5 +- 5 files changed, 86 insertions(+), 50 deletions(-) diff --git a/src/client.rs b/src/client.rs index f3a38ce..8f3520e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,21 +4,20 @@ use frame::Reason::*; use proto::{self, Connection, WindowSize}; use bytes::{Bytes, IntoBuf}; -use futures::{AndThen, Async, AsyncSink, Future, MapErr, Poll, Sink}; +use futures::{Async, Future, MapErr, Poll}; use http::{HeaderMap, Request, Response}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::WriteAll; use std::fmt; -use std::io::Error as IoError; +use std::io; +use std::marker::PhantomData; /// In progress H2 connection binding pub struct Handshake { - inner: AndThen< - MapErr, fn(IoError) -> ::Error>, - Result, ::Error>, - fn((T, &'static [u8])) -> Result, ::Error>, - >, + inner: MapErr, fn(io::Error) -> ::Error>, + settings: Settings, + _marker: PhantomData, } /// Marker type indicating a client peer @@ -36,6 +35,12 @@ pub struct Body { inner: proto::StreamRef, } +/// Build a Client. +#[derive(Debug, Default)] +pub struct Builder { + settings: Settings, +} + #[derive(Debug)] pub(crate) struct Peer; @@ -43,16 +48,7 @@ impl Client where T: AsyncRead + AsyncWrite, { - pub fn handshake(io: T) -> Handshake { - Client::handshake2(io) - } -} -impl Client -where - T: AsyncRead + AsyncWrite, - B: IntoBuf, -{ /// Bind an H2 client connection. /// /// Returns a future which resolves to the connection value once the H2 @@ -60,41 +56,35 @@ where /// /// It's important to note that this does not **flush** the outbound /// settings to the wire. - pub fn handshake2(io: T) -> Handshake { + pub fn handshake(io: T) -> Handshake { + Builder::default().handshake(io) + } +} + +impl Client<(), Bytes> { + /// Creates a Client Builder to customize a Client before binding. + pub fn builder() -> Builder { + Builder::default() + } +} + +impl Client +where T: AsyncRead + AsyncWrite, + B: IntoBuf +{ + fn handshake2(io: T, settings: Settings) -> Handshake { use tokio_io::io; debug!("binding client connection"); - let bind: fn((T, &'static [u8])) - -> Result, ::Error> = |(io, _)| { - debug!("client connection bound"); - - // Create the codec - let mut codec = Codec::new(io); - - // Create the initial SETTINGS frame - let settings = Settings::default(); - - // Send initial settings frame - match codec.start_send(settings.into()) { - Ok(AsyncSink::Ready) => { - let connection = Connection::new(codec); - Ok(Client { - connection, - }) - }, - Ok(_) => unreachable!(), - Err(e) => Err(::Error::from(e)), - } - }; - let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; let handshake = io::write_all(io, msg) - .map_err(::Error::from as fn(IoError) -> ::Error) - .and_then(bind); + .map_err(::Error::from as _); Handshake { inner: handshake, + settings: settings, + _marker: PhantomData, } } @@ -172,6 +162,30 @@ where } } +// ===== impl Builder ===== + +impl Builder { + /// Set the initial window size of the remote peer. + pub fn initial_window_size(&mut self, size: u32) -> &mut Self { + self.settings.set_initial_window_size(Some(size)); + self + } + + /// Bind an H2 client connection. + /// + /// Returns a future which resolves to the connection value once the H2 + /// handshake has been completed. + /// + /// It's important to note that this does not **flush** the outbound + /// settings to the wire. + pub fn handshake(&self, io: T) -> Handshake + where T: AsyncRead + AsyncWrite, + B: IntoBuf + { + Client::handshake2(io, self.settings.clone()) + } +} + // ===== impl Handshake ===== impl Future for Handshake @@ -182,7 +196,21 @@ where type Error = ::Error; fn poll(&mut self) -> Poll { - self.inner.poll() + let (io, _) = try_ready!(self.inner.poll()); + + debug!("client connection bound"); + + // Create the codec + let mut codec = Codec::new(io); + + // Send initial settings frame + codec.buffer(self.settings.clone().into()) + .expect("invalid SETTINGS frame"); + + let connection = Connection::new(codec, &self.settings); + Ok(Async::Ready(Client { + connection, + })) } } diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 69821c8..5da69bf 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -66,6 +66,10 @@ impl Settings { self.initial_window_size } + pub fn set_initial_window_size(&mut self, size: Option) { + self.initial_window_size = size; + } + pub fn max_concurrent_streams(&self) -> Option { self.max_concurrent_streams } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 9c297b2..c4a3edd 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -58,13 +58,15 @@ where P: Peer, B: IntoBuf, { - pub fn new(codec: Codec>) -> Connection { + pub fn new(codec: Codec>, settings: &frame::Settings) -> Connection { // TODO: Actually configure let streams = Streams::new(streams::Config { max_remote_initiated: None, init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, max_local_initiated: None, - init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, + init_local_window_sz: settings + .initial_window_size() + .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE), }); Connection { diff --git a/src/server.rs b/src/server.rs index f53c9f9..b5cf7ef 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,5 @@ use codec::{Codec, RecvError}; -use frame::{self, Reason, StreamId}; +use frame::{self, Reason, Settings, StreamId}; use frame::Reason::*; use proto::{self, Connection, WindowSize}; @@ -82,19 +82,18 @@ where let mut codec = Codec::new(io); // Create the initial SETTINGS frame - let settings = frame::Settings::default(); + let settings = Settings::default(); // Send initial settings frame codec .buffer(settings.into()) - .ok() .expect("invalid SETTINGS frame"); // Flush pending settings frame and then wait for the client preface let handshake = Flush::new(codec) .and_then(ReadPreface::new) .map(move |codec| { - let connection = Connection::new(codec); + let connection = Connection::new(codec, &Settings::default()); Server { connection, } diff --git a/tests/stream_states.rs b/tests/stream_states.rs index cf0be67..738bc0e 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -62,7 +62,10 @@ fn send_recv_data() { ]) .build(); - let mut h2 = Client::handshake2(mock).wait().unwrap(); + let mut h2 = Client::builder() + .handshake(mock) + .wait() + .unwrap(); let request = Request::builder() .method(Method::POST)