From d0afe30ab3039ff8cdf086c5f2a1ec5eb3ce677b Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 11 Sep 2017 14:24:01 -0700 Subject: [PATCH 1/4] fix recv connection flow should always use default initial window size --- src/proto/streams/recv.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index e0f041c..1db912b 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,7 +1,7 @@ use super::*; use {client, frame, proto, server}; use codec::{RecvError, UserError}; -use frame::Reason; +use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE}; use proto::*; use http::HeaderMap; @@ -64,13 +64,14 @@ where let mut flow = FlowControl::new(); - flow.inc_window(config.init_remote_window_sz) - .ok() + // connections always have the default window size, regardless of + // settings + flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE) .expect("invalid initial remote window size"); - flow.assign_capacity(config.init_remote_window_sz); + flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE); Recv { - init_window_sz: config.init_remote_window_sz, + init_window_sz: config.init_local_window_sz, flow: flow, next_stream_id: next_stream_id.into(), pending_window_updates: store::Queue::new(), From ed472f109ca1ed3a49750c3bd3ba773fd3f463d0 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 11 Sep 2017 14:25:32 -0700 Subject: [PATCH 2/4] add client::Builder to configure Clients --- src/client.rs | 114 +++++++++++++++++++++++++--------------- src/frame/settings.rs | 4 ++ src/proto/connection.rs | 6 ++- src/server.rs | 7 ++- tests/stream_states.rs | 5 +- 5 files changed, 86 insertions(+), 50 deletions(-) diff --git a/src/client.rs b/src/client.rs index f3a38ce..8f3520e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,21 +4,20 @@ use frame::Reason::*; use proto::{self, Connection, WindowSize}; use bytes::{Bytes, IntoBuf}; -use futures::{AndThen, Async, AsyncSink, Future, MapErr, Poll, Sink}; +use futures::{Async, Future, MapErr, Poll}; use http::{HeaderMap, Request, Response}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::WriteAll; use std::fmt; -use std::io::Error as IoError; +use std::io; +use std::marker::PhantomData; /// In progress H2 connection binding pub struct Handshake { - inner: AndThen< - MapErr, fn(IoError) -> ::Error>, - Result, ::Error>, - fn((T, &'static [u8])) -> Result, ::Error>, - >, + inner: MapErr, fn(io::Error) -> ::Error>, + settings: Settings, + _marker: PhantomData, } /// Marker type indicating a client peer @@ -36,6 +35,12 @@ pub struct Body { inner: proto::StreamRef, } +/// Build a Client. +#[derive(Debug, Default)] +pub struct Builder { + settings: Settings, +} + #[derive(Debug)] pub(crate) struct Peer; @@ -43,16 +48,7 @@ impl Client where T: AsyncRead + AsyncWrite, { - pub fn handshake(io: T) -> Handshake { - Client::handshake2(io) - } -} -impl Client -where - T: AsyncRead + AsyncWrite, - B: IntoBuf, -{ /// Bind an H2 client connection. /// /// Returns a future which resolves to the connection value once the H2 @@ -60,41 +56,35 @@ where /// /// It's important to note that this does not **flush** the outbound /// settings to the wire. - pub fn handshake2(io: T) -> Handshake { + pub fn handshake(io: T) -> Handshake { + Builder::default().handshake(io) + } +} + +impl Client<(), Bytes> { + /// Creates a Client Builder to customize a Client before binding. + pub fn builder() -> Builder { + Builder::default() + } +} + +impl Client +where T: AsyncRead + AsyncWrite, + B: IntoBuf +{ + fn handshake2(io: T, settings: Settings) -> Handshake { use tokio_io::io; debug!("binding client connection"); - let bind: fn((T, &'static [u8])) - -> Result, ::Error> = |(io, _)| { - debug!("client connection bound"); - - // Create the codec - let mut codec = Codec::new(io); - - // Create the initial SETTINGS frame - let settings = Settings::default(); - - // Send initial settings frame - match codec.start_send(settings.into()) { - Ok(AsyncSink::Ready) => { - let connection = Connection::new(codec); - Ok(Client { - connection, - }) - }, - Ok(_) => unreachable!(), - Err(e) => Err(::Error::from(e)), - } - }; - let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; let handshake = io::write_all(io, msg) - .map_err(::Error::from as fn(IoError) -> ::Error) - .and_then(bind); + .map_err(::Error::from as _); Handshake { inner: handshake, + settings: settings, + _marker: PhantomData, } } @@ -172,6 +162,30 @@ where } } +// ===== impl Builder ===== + +impl Builder { + /// Set the initial window size of the remote peer. + pub fn initial_window_size(&mut self, size: u32) -> &mut Self { + self.settings.set_initial_window_size(Some(size)); + self + } + + /// Bind an H2 client connection. + /// + /// Returns a future which resolves to the connection value once the H2 + /// handshake has been completed. + /// + /// It's important to note that this does not **flush** the outbound + /// settings to the wire. + pub fn handshake(&self, io: T) -> Handshake + where T: AsyncRead + AsyncWrite, + B: IntoBuf + { + Client::handshake2(io, self.settings.clone()) + } +} + // ===== impl Handshake ===== impl Future for Handshake @@ -182,7 +196,21 @@ where type Error = ::Error; fn poll(&mut self) -> Poll { - self.inner.poll() + let (io, _) = try_ready!(self.inner.poll()); + + debug!("client connection bound"); + + // Create the codec + let mut codec = Codec::new(io); + + // Send initial settings frame + codec.buffer(self.settings.clone().into()) + .expect("invalid SETTINGS frame"); + + let connection = Connection::new(codec, &self.settings); + Ok(Async::Ready(Client { + connection, + })) } } diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 69821c8..5da69bf 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -66,6 +66,10 @@ impl Settings { self.initial_window_size } + pub fn set_initial_window_size(&mut self, size: Option) { + self.initial_window_size = size; + } + pub fn max_concurrent_streams(&self) -> Option { self.max_concurrent_streams } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 9c297b2..c4a3edd 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -58,13 +58,15 @@ where P: Peer, B: IntoBuf, { - pub fn new(codec: Codec>) -> Connection { + pub fn new(codec: Codec>, settings: &frame::Settings) -> Connection { // TODO: Actually configure let streams = Streams::new(streams::Config { max_remote_initiated: None, init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, max_local_initiated: None, - init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, + init_local_window_sz: settings + .initial_window_size() + .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE), }); Connection { diff --git a/src/server.rs b/src/server.rs index f53c9f9..b5cf7ef 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,5 @@ use codec::{Codec, RecvError}; -use frame::{self, Reason, StreamId}; +use frame::{self, Reason, Settings, StreamId}; use frame::Reason::*; use proto::{self, Connection, WindowSize}; @@ -82,19 +82,18 @@ where let mut codec = Codec::new(io); // Create the initial SETTINGS frame - let settings = frame::Settings::default(); + let settings = Settings::default(); // Send initial settings frame codec .buffer(settings.into()) - .ok() .expect("invalid SETTINGS frame"); // Flush pending settings frame and then wait for the client preface let handshake = Flush::new(codec) .and_then(ReadPreface::new) .map(move |codec| { - let connection = Connection::new(codec); + let connection = Connection::new(codec, &Settings::default()); Server { connection, } diff --git a/tests/stream_states.rs b/tests/stream_states.rs index cf0be67..738bc0e 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -62,7 +62,10 @@ fn send_recv_data() { ]) .build(); - let mut h2 = Client::handshake2(mock).wait().unwrap(); + let mut h2 = Client::builder() + .handshake(mock) + .wait() + .unwrap(); let request = Request::builder() .method(Method::POST) From e2cda1860b4e90d7432582603d503d8e7b2a643e Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 11 Sep 2017 14:46:46 -0700 Subject: [PATCH 3/4] fix Body to return errors when there is recv error --- src/proto/streams/recv.rs | 33 +++++++++++++-------------------- src/proto/streams/state.rs | 5 +++-- tests/flow_control.rs | 11 +---------- 3 files changed, 17 insertions(+), 32 deletions(-) diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 1db912b..c810edf 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -565,16 +565,7 @@ where // No more data frames Ok(None.into()) }, - None => { - if stream.state.is_recv_closed() { - // No more data frames will be received - Ok(None.into()) - } else { - // Request to get notified once more data frames arrive - stream.recv_task = Some(task::current()); - Ok(Async::NotReady) - } - }, + None => self.schedule_recv(stream), } } @@ -590,16 +581,18 @@ where // we do? unimplemented!(); }, - None => { - if stream.state.is_recv_closed() { - // There will be no trailer frame - Ok(None.into()) - } else { - // Request to get notified once another frame arrives - stream.recv_task = Some(task::current()); - Ok(Async::NotReady) - } - }, + None => self.schedule_recv(stream), + } + } + + fn schedule_recv(&mut self, stream: &mut Stream) -> Poll, proto::Error> { + if stream.state.ensure_recv_open()? { + // Request to get notified once more frames arrive + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } else { + // No more frames will be received + Ok(None.into()) } } } diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 200e3b0..c6efdd5 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -314,14 +314,15 @@ impl State { } } - pub fn ensure_recv_open(&self) -> Result<(), proto::Error> { + pub fn ensure_recv_open(&self) -> Result { use std::io; // TODO: Is this correct? match self.inner { Closed(Some(Cause::Proto(reason))) => Err(proto::Error::Proto(reason)), Closed(Some(Cause::Io)) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())), - _ => Ok(()), + Closed(None) | HalfClosedRemote(..) => Ok(false), + _ => Ok(true), } } } diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 41c3ef0..227e1a8 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -225,17 +225,8 @@ fn recv_data_overflows_connection_window() { .and_then(|resp| { assert_eq!(resp.status(), StatusCode::OK); let body = resp.into_parts().1; + // FIXME: body stream should error also body.concat2().unwrap() - /* FIXME: body stream should error also - .then(|res| { - let err = res.unwrap_err(); - assert_eq!( - err.to_string(), - "protocol error: flow-control protocol violated" - ); - Ok::<(), ()>(()) - }) - */ }); // client should see a flow control error From 3ec0e85e56a04c2d1f1ce9e866937b0ff19bb96b Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 11 Sep 2017 15:05:52 -0700 Subject: [PATCH 4/4] add test when stream window overflows before conn window --- src/client.rs | 19 +++--- src/proto/connection.rs | 6 +- tests/flow_control.rs | 77 +++++++++++++++++++++++- tests/stream_states.rs | 5 +- tests/support/src/frames.rs | 113 +++++++++++++++++------------------- tests/support/src/mock.rs | 8 +++ 6 files changed, 151 insertions(+), 77 deletions(-) diff --git a/src/client.rs b/src/client.rs index 8f3520e..e35269e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -48,7 +48,6 @@ impl Client where T: AsyncRead + AsyncWrite, { - /// Bind an H2 client connection. /// /// Returns a future which resolves to the connection value once the H2 @@ -69,8 +68,9 @@ impl Client<(), Bytes> { } impl Client -where T: AsyncRead + AsyncWrite, - B: IntoBuf +where + T: AsyncRead + AsyncWrite, + B: IntoBuf, { fn handshake2(io: T, settings: Settings) -> Handshake { use tokio_io::io; @@ -78,8 +78,7 @@ where T: AsyncRead + AsyncWrite, debug!("binding client connection"); let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; - let handshake = io::write_all(io, msg) - .map_err(::Error::from as _); + let handshake = io::write_all(io, msg).map_err(::Error::from as _); Handshake { inner: handshake, @@ -179,8 +178,9 @@ impl Builder { /// It's important to note that this does not **flush** the outbound /// settings to the wire. pub fn handshake(&self, io: T) -> Handshake - where T: AsyncRead + AsyncWrite, - B: IntoBuf + where + T: AsyncRead + AsyncWrite, + B: IntoBuf, { Client::handshake2(io, self.settings.clone()) } @@ -204,8 +204,9 @@ where let mut codec = Codec::new(io); // Send initial settings frame - codec.buffer(self.settings.clone().into()) - .expect("invalid SETTINGS frame"); + codec + .buffer(self.settings.clone().into()) + .expect("invalid SETTINGS frame"); let connection = Connection::new(codec, &self.settings); Ok(Async::Ready(Client { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index c4a3edd..bbf74f3 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -58,7 +58,10 @@ where P: Peer, B: IntoBuf, { - pub fn new(codec: Codec>, settings: &frame::Settings) -> Connection { + pub fn new( + codec: Codec>, + settings: &frame::Settings, + ) -> Connection { // TODO: Actually configure let streams = Streams::new(streams::Config { max_remote_initiated: None, @@ -68,7 +71,6 @@ where .initial_window_size() .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE), }); - Connection { state: State::Open, codec: codec, diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 227e1a8..47f8350 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -225,8 +225,14 @@ fn recv_data_overflows_connection_window() { .and_then(|resp| { assert_eq!(resp.status(), StatusCode::OK); let body = resp.into_parts().1; - // FIXME: body stream should error also - body.concat2().unwrap() + body.concat2().then(|res| { + let err = res.unwrap_err(); + assert_eq!( + err.to_string(), + "protocol error: flow-control protocol violated" + ); + Ok::<(), ()>(()) + }) }); // client should see a flow control error @@ -244,11 +250,76 @@ fn recv_data_overflows_connection_window() { } #[test] -#[ignore] fn recv_data_overflows_stream_window() { // this tests for when streams have smaller windows than their connection + let _ = ::env_logger::init(); + + let (io, srv) = mock::new(); + + let mock = srv.assert_client_handshake().unwrap() + .ignore_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos() + ) + .send_frame( + frames::headers(1) + .response(200) + ) + // fill the whole window + .send_frame(frames::data(1, vec![0u8; 16_384])) + // this frame overflows the window! + .send_frame(frames::data(1, &[0; 16][..]).eos()) + // expecting goaway for the conn + // TODO: change to a RST_STREAM eventually + .recv_frame(frames::go_away(0).flow_control()) + // close the connection + .map(drop); + + let h2 = Client::builder() + .initial_window_size(16_384) + .handshake::<_, Bytes>(io) + .unwrap() + .and_then(|mut h2| { + let request = Request::builder() + .method(Method::GET) + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + + let req = h2.request(request, true) + .unwrap() + .unwrap() + .and_then(|resp| { + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.into_parts().1; + body.concat2().then(|res| { + let err = res.unwrap_err(); + assert_eq!( + err.to_string(), + "protocol error: flow-control protocol violated" + ); + Ok::<(), ()>(()) + }) + }); + + // client should see a flow control error + let conn = h2.then(|res| { + let err = res.unwrap_err(); + assert_eq!( + err.to_string(), + "protocol error: flow-control protocol violated" + ); + Ok::<(), ()>(()) + }); + conn.unwrap().join(req) + }); + h2.join(mock).wait().unwrap(); } + + #[test] #[ignore] fn recv_window_update_causes_overflow() { diff --git a/tests/stream_states.rs b/tests/stream_states.rs index 738bc0e..ff1be13 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -62,10 +62,7 @@ fn send_recv_data() { ]) .build(); - let mut h2 = Client::builder() - .handshake(mock) - .wait() - .unwrap(); + let mut h2 = Client::builder().handshake(mock).wait().unwrap(); let request = Request::builder() .method(Method::POST) diff --git a/tests/support/src/frames.rs b/tests/support/src/frames.rs index 3e86753..af5719a 100644 --- a/tests/support/src/frames.rs +++ b/tests/support/src/frames.rs @@ -11,21 +11,21 @@ pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; // ==== helper functions to easily construct h2 Frames ==== -pub fn headers(id: T) -> MockHeaders +pub fn headers(id: T) -> Mock where T: Into, { - MockHeaders(frame::Headers::new( + Mock(frame::Headers::new( id.into(), frame::Pseudo::default(), HeaderMap::default(), )) } -pub fn data(id: T, buf: B) -> MockData +pub fn data(id: T, buf: B) -> Mock where T: Into, B: Into, { - MockData(frame::Data::new(id.into(), buf.into())) + Mock(frame::Data::new(id.into(), buf.into())) } pub fn window_update(id: T, sz: u32) -> frame::WindowUpdate @@ -34,17 +34,38 @@ pub fn window_update(id: T, sz: u32) -> frame::WindowUpdate frame::WindowUpdate::new(id.into(), sz) } -pub fn go_away(id: T) -> MockGoAway +pub fn go_away(id: T) -> Mock where T: Into, { - MockGoAway(frame::GoAway::new(id.into(), frame::Reason::NoError)) + Mock(frame::GoAway::new(id.into(), frame::Reason::NoError)) +} + +pub fn reset(id: T) -> Mock + where T: Into, +{ + Mock(frame::Reset::new(id.into(), frame::Reason::NoError)) +} + +// === Generic helpers of all frame types + +pub struct Mock(T); + +impl fmt::Debug for Mock { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.0, f) + } +} + +impl From> for Frame +where T: Into { + fn from(src: Mock) -> Self { + src.0.into() + } } // Headers helpers -pub struct MockHeaders(frame::Headers); - -impl MockHeaders { +impl Mock { pub fn request(self, method: M, uri: U) -> Self where M: HttpTryInto, U: HttpTryInto, @@ -57,7 +78,7 @@ impl MockHeaders { frame::Pseudo::request(method, uri), fields ); - MockHeaders(frame) + Mock(frame) } pub fn response(self, status: S) -> Self @@ -70,13 +91,13 @@ impl MockHeaders { frame::Pseudo::response(status), fields ); - MockHeaders(frame) + Mock(frame) } pub fn fields(self, fields: HeaderMap) -> Self { let (id, pseudo, _) = self.into_parts(); let frame = frame::Headers::new(id, pseudo, fields); - MockHeaders(frame) + Mock(frame) } pub fn eos(mut self) -> Self { @@ -93,49 +114,23 @@ impl MockHeaders { } } -impl fmt::Debug for MockHeaders { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(&self.0, f) - } -} - -impl From for Frame { - fn from(src: MockHeaders) -> Self { - Frame::Headers(src.0) - } -} - -impl From for SendFrame { - fn from(src: MockHeaders) -> Self { +impl From> for SendFrame { + fn from(src: Mock) -> Self { Frame::Headers(src.0) } } // Data helpers -pub struct MockData(frame::Data); - -impl MockData { +impl Mock { pub fn eos(mut self) -> Self { self.0.set_end_stream(true); self } } -impl fmt::Debug for MockData { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(&self.0, f) - } -} - -impl From for Frame { - fn from(src: MockData) -> Self { - Frame::Data(src.0) - } -} - -impl From for SendFrame { - fn from(src: MockData) -> Self { +impl From> for SendFrame { + fn from(src: Mock) -> Self { let id = src.0.stream_id(); let eos = src.0.is_end_stream(); let payload = src.0.into_payload(); @@ -145,32 +140,32 @@ impl From for SendFrame { } } - // GoAway helpers -pub struct MockGoAway(frame::GoAway); - -impl MockGoAway { +impl Mock { pub fn flow_control(self) -> Self { - MockGoAway(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FlowControlError)) + Mock(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FlowControlError)) } } -impl fmt::Debug for MockGoAway { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(&self.0, f) - } -} - -impl From for Frame { - fn from(src: MockGoAway) -> Self { +impl From> for SendFrame { + fn from(src: Mock) -> Self { Frame::GoAway(src.0) } } -impl From for SendFrame { - fn from(src: MockGoAway) -> Self { - Frame::GoAway(src.0) +// ==== Reset helpers + +impl Mock { + pub fn flow_control(self) -> Self { + let id = self.0.stream_id(); + Mock(frame::Reset::new(id, frame::Reason::FlowControlError)) + } +} + +impl From> for SendFrame { + fn from(src: Mock) -> Self { + Frame::Reset(src.0) } } diff --git a/tests/support/src/mock.rs b/tests/support/src/mock.rs index dc22a26..67e4731 100644 --- a/tests/support/src/mock.rs +++ b/tests/support/src/mock.rs @@ -326,6 +326,14 @@ pub trait HandleFutureExt { } } + fn ignore_settings(self) -> Box> + where Self: Sized + 'static, + Self: Future, + Self::Error: fmt::Debug, + { + Box::new(self.map(|(_settings, handle)| handle).unwrap()) + } + fn recv_frame(self, frame: T) -> RecvFrame<::Future> where Self: IntoRecvFrame + Sized, T: Into,