Merge pull request #79 from seanmonstar/client-config

Intial Client Configuration
This commit is contained in:
Sean McArthur
2017-09-13 14:55:36 -07:00
committed by GitHub
10 changed files with 247 additions and 151 deletions

View File

@@ -4,21 +4,20 @@ use frame::Reason::*;
use proto::{self, Connection, WindowSize};
use bytes::{Bytes, IntoBuf};
use futures::{AndThen, Async, AsyncSink, Future, MapErr, Poll, Sink};
use futures::{Async, Future, MapErr, Poll};
use http::{HeaderMap, Request, Response};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::WriteAll;
use std::fmt;
use std::io::Error as IoError;
use std::io;
use std::marker::PhantomData;
/// In progress H2 connection binding
pub struct Handshake<T: AsyncRead + AsyncWrite, B: IntoBuf = Bytes> {
inner: AndThen<
MapErr<WriteAll<T, &'static [u8]>, fn(IoError) -> ::Error>,
Result<Client<T, B>, ::Error>,
fn((T, &'static [u8])) -> Result<Client<T, B>, ::Error>,
>,
inner: MapErr<WriteAll<T, &'static [u8]>, fn(io::Error) -> ::Error>,
settings: Settings,
_marker: PhantomData<B>,
}
/// Marker type indicating a client peer
@@ -36,22 +35,18 @@ pub struct Body<B: IntoBuf> {
inner: proto::StreamRef<B::Buf, Peer>,
}
/// Build a Client.
#[derive(Debug, Default)]
pub struct Builder {
settings: Settings,
}
#[derive(Debug)]
pub(crate) struct Peer;
impl<T> Client<T, Bytes>
where
T: AsyncRead + AsyncWrite,
{
pub fn handshake(io: T) -> Handshake<T, Bytes> {
Client::handshake2(io)
}
}
impl<T, B> Client<T, B>
where
T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
/// Bind an H2 client connection.
///
@@ -60,41 +55,35 @@ where
///
/// It's important to note that this does not **flush** the outbound
/// settings to the wire.
pub fn handshake2(io: T) -> Handshake<T, B> {
pub fn handshake(io: T) -> Handshake<T, Bytes> {
Builder::default().handshake(io)
}
}
impl Client<(), Bytes> {
/// Creates a Client Builder to customize a Client before binding.
pub fn builder() -> Builder {
Builder::default()
}
}
impl<T, B> Client<T, B>
where
T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
fn handshake2(io: T, settings: Settings) -> Handshake<T, B> {
use tokio_io::io;
debug!("binding client connection");
let bind: fn((T, &'static [u8]))
-> Result<Client<T, B>, ::Error> = |(io, _)| {
debug!("client connection bound");
// Create the codec
let mut codec = Codec::new(io);
// Create the initial SETTINGS frame
let settings = Settings::default();
// Send initial settings frame
match codec.start_send(settings.into()) {
Ok(AsyncSink::Ready) => {
let connection = Connection::new(codec);
Ok(Client {
connection,
})
},
Ok(_) => unreachable!(),
Err(e) => Err(::Error::from(e)),
}
};
let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
let handshake = io::write_all(io, msg)
.map_err(::Error::from as fn(IoError) -> ::Error)
.and_then(bind);
let handshake = io::write_all(io, msg).map_err(::Error::from as _);
Handshake {
inner: handshake,
settings: settings,
_marker: PhantomData,
}
}
@@ -172,6 +161,31 @@ where
}
}
// ===== impl Builder =====
impl Builder {
/// Set the initial window size of the remote peer.
pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
self.settings.set_initial_window_size(Some(size));
self
}
/// Bind an H2 client connection.
///
/// Returns a future which resolves to the connection value once the H2
/// handshake has been completed.
///
/// It's important to note that this does not **flush** the outbound
/// settings to the wire.
pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
Client::handshake2(io, self.settings.clone())
}
}
// ===== impl Handshake =====
impl<T, B: IntoBuf> Future for Handshake<T, B>
@@ -182,7 +196,22 @@ where
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
let (io, _) = try_ready!(self.inner.poll());
debug!("client connection bound");
// Create the codec
let mut codec = Codec::new(io);
// Send initial settings frame
codec
.buffer(self.settings.clone().into())
.expect("invalid SETTINGS frame");
let connection = Connection::new(codec, &self.settings);
Ok(Async::Ready(Client {
connection,
}))
}
}

View File

@@ -66,6 +66,10 @@ impl Settings {
self.initial_window_size
}
pub fn set_initial_window_size(&mut self, size: Option<u32>) {
self.initial_window_size = size;
}
pub fn max_concurrent_streams(&self) -> Option<u32> {
self.max_concurrent_streams
}

View File

@@ -58,15 +58,19 @@ where
P: Peer,
B: IntoBuf,
{
pub fn new(codec: Codec<T, Prioritized<B::Buf>>) -> Connection<T, P, B> {
pub fn new(
codec: Codec<T, Prioritized<B::Buf>>,
settings: &frame::Settings,
) -> Connection<T, P, B> {
// TODO: Actually configure
let streams = Streams::new(streams::Config {
max_remote_initiated: None,
init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
max_local_initiated: None,
init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
init_local_window_sz: settings
.initial_window_size()
.unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE),
});
Connection {
state: State::Open,
codec: codec,

View File

@@ -1,7 +1,7 @@
use super::*;
use {client, frame, proto, server};
use codec::{RecvError, UserError};
use frame::Reason;
use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE};
use proto::*;
use http::HeaderMap;
@@ -64,13 +64,14 @@ where
let mut flow = FlowControl::new();
flow.inc_window(config.init_remote_window_sz)
.ok()
// connections always have the default window size, regardless of
// settings
flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
.expect("invalid initial remote window size");
flow.assign_capacity(config.init_remote_window_sz);
flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE);
Recv {
init_window_sz: config.init_remote_window_sz,
init_window_sz: config.init_local_window_sz,
flow: flow,
next_stream_id: next_stream_id.into(),
pending_window_updates: store::Queue::new(),
@@ -564,16 +565,7 @@ where
// No more data frames
Ok(None.into())
},
None => {
if stream.state.is_recv_closed() {
// No more data frames will be received
Ok(None.into())
} else {
// Request to get notified once more data frames arrive
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
}
},
None => self.schedule_recv(stream),
}
}
@@ -589,16 +581,18 @@ where
// we do?
unimplemented!();
},
None => {
if stream.state.is_recv_closed() {
// There will be no trailer frame
Ok(None.into())
} else {
// Request to get notified once another frame arrives
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
}
},
None => self.schedule_recv(stream),
}
}
fn schedule_recv<T>(&mut self, stream: &mut Stream<B, P>) -> Poll<Option<T>, proto::Error> {
if stream.state.ensure_recv_open()? {
// Request to get notified once more frames arrive
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
} else {
// No more frames will be received
Ok(None.into())
}
}
}

View File

@@ -314,14 +314,15 @@ impl State {
}
}
pub fn ensure_recv_open(&self) -> Result<(), proto::Error> {
pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
use std::io;
// TODO: Is this correct?
match self.inner {
Closed(Some(Cause::Proto(reason))) => Err(proto::Error::Proto(reason)),
Closed(Some(Cause::Io)) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())),
_ => Ok(()),
Closed(None) | HalfClosedRemote(..) => Ok(false),
_ => Ok(true),
}
}
}

View File

@@ -1,5 +1,5 @@
use codec::{Codec, RecvError};
use frame::{self, Reason, StreamId};
use frame::{self, Reason, Settings, StreamId};
use frame::Reason::*;
use proto::{self, Connection, WindowSize};
@@ -82,19 +82,18 @@ where
let mut codec = Codec::new(io);
// Create the initial SETTINGS frame
let settings = frame::Settings::default();
let settings = Settings::default();
// Send initial settings frame
codec
.buffer(settings.into())
.ok()
.expect("invalid SETTINGS frame");
// Flush pending settings frame and then wait for the client preface
let handshake = Flush::new(codec)
.and_then(ReadPreface::new)
.map(move |codec| {
let connection = Connection::new(codec);
let connection = Connection::new(codec, &Settings::default());
Server {
connection,
}

View File

@@ -225,17 +225,14 @@ fn recv_data_overflows_connection_window() {
.and_then(|resp| {
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_parts().1;
body.concat2().unwrap()
/* FIXME: body stream should error also
.then(|res| {
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: flow-control protocol violated"
);
Ok::<(), ()>(())
})
*/
body.concat2().then(|res| {
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: flow-control protocol violated"
);
Ok::<(), ()>(())
})
});
// client should see a flow control error
@@ -253,11 +250,76 @@ fn recv_data_overflows_connection_window() {
}
#[test]
#[ignore]
fn recv_data_overflows_stream_window() {
// this tests for when streams have smaller windows than their connection
let _ = ::env_logger::init();
let (io, srv) = mock::new();
let mock = srv.assert_client_handshake().unwrap()
.ignore_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos()
)
.send_frame(
frames::headers(1)
.response(200)
)
// fill the whole window
.send_frame(frames::data(1, vec![0u8; 16_384]))
// this frame overflows the window!
.send_frame(frames::data(1, &[0; 16][..]).eos())
// expecting goaway for the conn
// TODO: change to a RST_STREAM eventually
.recv_frame(frames::go_away(0).flow_control())
// close the connection
.map(drop);
let h2 = Client::builder()
.initial_window_size(16_384)
.handshake::<_, Bytes>(io)
.unwrap()
.and_then(|mut h2| {
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = h2.request(request, true)
.unwrap()
.unwrap()
.and_then(|resp| {
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_parts().1;
body.concat2().then(|res| {
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: flow-control protocol violated"
);
Ok::<(), ()>(())
})
});
// client should see a flow control error
let conn = h2.then(|res| {
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: flow-control protocol violated"
);
Ok::<(), ()>(())
});
conn.unwrap().join(req)
});
h2.join(mock).wait().unwrap();
}
#[test]
#[ignore]
fn recv_window_update_causes_overflow() {

View File

@@ -62,7 +62,7 @@ fn send_recv_data() {
])
.build();
let mut h2 = Client::handshake2(mock).wait().unwrap();
let mut h2 = Client::builder().handshake(mock).wait().unwrap();
let request = Request::builder()
.method(Method::POST)

View File

@@ -11,21 +11,21 @@ pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
// ==== helper functions to easily construct h2 Frames ====
pub fn headers<T>(id: T) -> MockHeaders
pub fn headers<T>(id: T) -> Mock<frame::Headers>
where T: Into<StreamId>,
{
MockHeaders(frame::Headers::new(
Mock(frame::Headers::new(
id.into(),
frame::Pseudo::default(),
HeaderMap::default(),
))
}
pub fn data<T, B>(id: T, buf: B) -> MockData
pub fn data<T, B>(id: T, buf: B) -> Mock<frame::Data>
where T: Into<StreamId>,
B: Into<Bytes>,
{
MockData(frame::Data::new(id.into(), buf.into()))
Mock(frame::Data::new(id.into(), buf.into()))
}
pub fn window_update<T>(id: T, sz: u32) -> frame::WindowUpdate
@@ -34,17 +34,38 @@ pub fn window_update<T>(id: T, sz: u32) -> frame::WindowUpdate
frame::WindowUpdate::new(id.into(), sz)
}
pub fn go_away<T>(id: T) -> MockGoAway
pub fn go_away<T>(id: T) -> Mock<frame::GoAway>
where T: Into<StreamId>,
{
MockGoAway(frame::GoAway::new(id.into(), frame::Reason::NoError))
Mock(frame::GoAway::new(id.into(), frame::Reason::NoError))
}
pub fn reset<T>(id: T) -> Mock<frame::Reset>
where T: Into<StreamId>,
{
Mock(frame::Reset::new(id.into(), frame::Reason::NoError))
}
// === Generic helpers of all frame types
pub struct Mock<T>(T);
impl<T: fmt::Debug> fmt::Debug for Mock<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}
impl<T> From<Mock<T>> for Frame
where T: Into<Frame> {
fn from(src: Mock<T>) -> Self {
src.0.into()
}
}
// Headers helpers
pub struct MockHeaders(frame::Headers);
impl MockHeaders {
impl Mock<frame::Headers> {
pub fn request<M, U>(self, method: M, uri: U) -> Self
where M: HttpTryInto<http::Method>,
U: HttpTryInto<http::Uri>,
@@ -57,7 +78,7 @@ impl MockHeaders {
frame::Pseudo::request(method, uri),
fields
);
MockHeaders(frame)
Mock(frame)
}
pub fn response<S>(self, status: S) -> Self
@@ -70,13 +91,13 @@ impl MockHeaders {
frame::Pseudo::response(status),
fields
);
MockHeaders(frame)
Mock(frame)
}
pub fn fields(self, fields: HeaderMap) -> Self {
let (id, pseudo, _) = self.into_parts();
let frame = frame::Headers::new(id, pseudo, fields);
MockHeaders(frame)
Mock(frame)
}
pub fn eos(mut self) -> Self {
@@ -93,49 +114,23 @@ impl MockHeaders {
}
}
impl fmt::Debug for MockHeaders {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}
impl From<MockHeaders> for Frame {
fn from(src: MockHeaders) -> Self {
Frame::Headers(src.0)
}
}
impl From<MockHeaders> for SendFrame {
fn from(src: MockHeaders) -> Self {
impl From<Mock<frame::Headers>> for SendFrame {
fn from(src: Mock<frame::Headers>) -> Self {
Frame::Headers(src.0)
}
}
// Data helpers
pub struct MockData(frame::Data);
impl MockData {
impl Mock<frame::Data> {
pub fn eos(mut self) -> Self {
self.0.set_end_stream(true);
self
}
}
impl fmt::Debug for MockData {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}
impl From<MockData> for Frame {
fn from(src: MockData) -> Self {
Frame::Data(src.0)
}
}
impl From<MockData> for SendFrame {
fn from(src: MockData) -> Self {
impl From<Mock<frame::Data>> for SendFrame {
fn from(src: Mock<frame::Data>) -> Self {
let id = src.0.stream_id();
let eos = src.0.is_end_stream();
let payload = src.0.into_payload();
@@ -145,32 +140,32 @@ impl From<MockData> for SendFrame {
}
}
// GoAway helpers
pub struct MockGoAway(frame::GoAway);
impl MockGoAway {
impl Mock<frame::GoAway> {
pub fn flow_control(self) -> Self {
MockGoAway(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FlowControlError))
Mock(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FlowControlError))
}
}
impl fmt::Debug for MockGoAway {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}
impl From<MockGoAway> for Frame {
fn from(src: MockGoAway) -> Self {
impl From<Mock<frame::GoAway>> for SendFrame {
fn from(src: Mock<frame::GoAway>) -> Self {
Frame::GoAway(src.0)
}
}
impl From<MockGoAway> for SendFrame {
fn from(src: MockGoAway) -> Self {
Frame::GoAway(src.0)
// ==== Reset helpers
impl Mock<frame::Reset> {
pub fn flow_control(self) -> Self {
let id = self.0.stream_id();
Mock(frame::Reset::new(id, frame::Reason::FlowControlError))
}
}
impl From<Mock<frame::Reset>> for SendFrame {
fn from(src: Mock<frame::Reset>) -> Self {
Frame::Reset(src.0)
}
}

View File

@@ -326,6 +326,14 @@ pub trait HandleFutureExt {
}
}
fn ignore_settings(self) -> Box<Future<Item=Handle, Error=()>>
where Self: Sized + 'static,
Self: Future<Item=(frame::Settings, Handle)>,
Self::Error: fmt::Debug,
{
Box::new(self.map(|(_settings, handle)| handle).unwrap())
}
fn recv_frame<T>(self, frame: T) -> RecvFrame<<Self as IntoRecvFrame>::Future>
where Self: IntoRecvFrame + Sized,
T: Into<Frame>,