From 3ec0e85e56a04c2d1f1ce9e866937b0ff19bb96b Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 11 Sep 2017 15:05:52 -0700 Subject: [PATCH] add test when stream window overflows before conn window --- src/client.rs | 19 +++--- src/proto/connection.rs | 6 +- tests/flow_control.rs | 77 +++++++++++++++++++++++- tests/stream_states.rs | 5 +- tests/support/src/frames.rs | 113 +++++++++++++++++------------------- tests/support/src/mock.rs | 8 +++ 6 files changed, 151 insertions(+), 77 deletions(-) diff --git a/src/client.rs b/src/client.rs index 8f3520e..e35269e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -48,7 +48,6 @@ impl Client where T: AsyncRead + AsyncWrite, { - /// Bind an H2 client connection. /// /// Returns a future which resolves to the connection value once the H2 @@ -69,8 +68,9 @@ impl Client<(), Bytes> { } impl Client -where T: AsyncRead + AsyncWrite, - B: IntoBuf +where + T: AsyncRead + AsyncWrite, + B: IntoBuf, { fn handshake2(io: T, settings: Settings) -> Handshake { use tokio_io::io; @@ -78,8 +78,7 @@ where T: AsyncRead + AsyncWrite, debug!("binding client connection"); let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; - let handshake = io::write_all(io, msg) - .map_err(::Error::from as _); + let handshake = io::write_all(io, msg).map_err(::Error::from as _); Handshake { inner: handshake, @@ -179,8 +178,9 @@ impl Builder { /// It's important to note that this does not **flush** the outbound /// settings to the wire. pub fn handshake(&self, io: T) -> Handshake - where T: AsyncRead + AsyncWrite, - B: IntoBuf + where + T: AsyncRead + AsyncWrite, + B: IntoBuf, { Client::handshake2(io, self.settings.clone()) } @@ -204,8 +204,9 @@ where let mut codec = Codec::new(io); // Send initial settings frame - codec.buffer(self.settings.clone().into()) - .expect("invalid SETTINGS frame"); + codec + .buffer(self.settings.clone().into()) + .expect("invalid SETTINGS frame"); let connection = Connection::new(codec, &self.settings); Ok(Async::Ready(Client { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index c4a3edd..bbf74f3 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -58,7 +58,10 @@ where P: Peer, B: IntoBuf, { - pub fn new(codec: Codec>, settings: &frame::Settings) -> Connection { + pub fn new( + codec: Codec>, + settings: &frame::Settings, + ) -> Connection { // TODO: Actually configure let streams = Streams::new(streams::Config { max_remote_initiated: None, @@ -68,7 +71,6 @@ where .initial_window_size() .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE), }); - Connection { state: State::Open, codec: codec, diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 227e1a8..47f8350 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -225,8 +225,14 @@ fn recv_data_overflows_connection_window() { .and_then(|resp| { assert_eq!(resp.status(), StatusCode::OK); let body = resp.into_parts().1; - // FIXME: body stream should error also - body.concat2().unwrap() + body.concat2().then(|res| { + let err = res.unwrap_err(); + assert_eq!( + err.to_string(), + "protocol error: flow-control protocol violated" + ); + Ok::<(), ()>(()) + }) }); // client should see a flow control error @@ -244,11 +250,76 @@ fn recv_data_overflows_connection_window() { } #[test] -#[ignore] fn recv_data_overflows_stream_window() { // this tests for when streams have smaller windows than their connection + let _ = ::env_logger::init(); + + let (io, srv) = mock::new(); + + let mock = srv.assert_client_handshake().unwrap() + .ignore_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos() + ) + .send_frame( + frames::headers(1) + .response(200) + ) + // fill the whole window + .send_frame(frames::data(1, vec![0u8; 16_384])) + // this frame overflows the window! + .send_frame(frames::data(1, &[0; 16][..]).eos()) + // expecting goaway for the conn + // TODO: change to a RST_STREAM eventually + .recv_frame(frames::go_away(0).flow_control()) + // close the connection + .map(drop); + + let h2 = Client::builder() + .initial_window_size(16_384) + .handshake::<_, Bytes>(io) + .unwrap() + .and_then(|mut h2| { + let request = Request::builder() + .method(Method::GET) + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + + let req = h2.request(request, true) + .unwrap() + .unwrap() + .and_then(|resp| { + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.into_parts().1; + body.concat2().then(|res| { + let err = res.unwrap_err(); + assert_eq!( + err.to_string(), + "protocol error: flow-control protocol violated" + ); + Ok::<(), ()>(()) + }) + }); + + // client should see a flow control error + let conn = h2.then(|res| { + let err = res.unwrap_err(); + assert_eq!( + err.to_string(), + "protocol error: flow-control protocol violated" + ); + Ok::<(), ()>(()) + }); + conn.unwrap().join(req) + }); + h2.join(mock).wait().unwrap(); } + + #[test] #[ignore] fn recv_window_update_causes_overflow() { diff --git a/tests/stream_states.rs b/tests/stream_states.rs index 738bc0e..ff1be13 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -62,10 +62,7 @@ fn send_recv_data() { ]) .build(); - let mut h2 = Client::builder() - .handshake(mock) - .wait() - .unwrap(); + let mut h2 = Client::builder().handshake(mock).wait().unwrap(); let request = Request::builder() .method(Method::POST) diff --git a/tests/support/src/frames.rs b/tests/support/src/frames.rs index 3e86753..af5719a 100644 --- a/tests/support/src/frames.rs +++ b/tests/support/src/frames.rs @@ -11,21 +11,21 @@ pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; // ==== helper functions to easily construct h2 Frames ==== -pub fn headers(id: T) -> MockHeaders +pub fn headers(id: T) -> Mock where T: Into, { - MockHeaders(frame::Headers::new( + Mock(frame::Headers::new( id.into(), frame::Pseudo::default(), HeaderMap::default(), )) } -pub fn data(id: T, buf: B) -> MockData +pub fn data(id: T, buf: B) -> Mock where T: Into, B: Into, { - MockData(frame::Data::new(id.into(), buf.into())) + Mock(frame::Data::new(id.into(), buf.into())) } pub fn window_update(id: T, sz: u32) -> frame::WindowUpdate @@ -34,17 +34,38 @@ pub fn window_update(id: T, sz: u32) -> frame::WindowUpdate frame::WindowUpdate::new(id.into(), sz) } -pub fn go_away(id: T) -> MockGoAway +pub fn go_away(id: T) -> Mock where T: Into, { - MockGoAway(frame::GoAway::new(id.into(), frame::Reason::NoError)) + Mock(frame::GoAway::new(id.into(), frame::Reason::NoError)) +} + +pub fn reset(id: T) -> Mock + where T: Into, +{ + Mock(frame::Reset::new(id.into(), frame::Reason::NoError)) +} + +// === Generic helpers of all frame types + +pub struct Mock(T); + +impl fmt::Debug for Mock { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.0, f) + } +} + +impl From> for Frame +where T: Into { + fn from(src: Mock) -> Self { + src.0.into() + } } // Headers helpers -pub struct MockHeaders(frame::Headers); - -impl MockHeaders { +impl Mock { pub fn request(self, method: M, uri: U) -> Self where M: HttpTryInto, U: HttpTryInto, @@ -57,7 +78,7 @@ impl MockHeaders { frame::Pseudo::request(method, uri), fields ); - MockHeaders(frame) + Mock(frame) } pub fn response(self, status: S) -> Self @@ -70,13 +91,13 @@ impl MockHeaders { frame::Pseudo::response(status), fields ); - MockHeaders(frame) + Mock(frame) } pub fn fields(self, fields: HeaderMap) -> Self { let (id, pseudo, _) = self.into_parts(); let frame = frame::Headers::new(id, pseudo, fields); - MockHeaders(frame) + Mock(frame) } pub fn eos(mut self) -> Self { @@ -93,49 +114,23 @@ impl MockHeaders { } } -impl fmt::Debug for MockHeaders { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(&self.0, f) - } -} - -impl From for Frame { - fn from(src: MockHeaders) -> Self { - Frame::Headers(src.0) - } -} - -impl From for SendFrame { - fn from(src: MockHeaders) -> Self { +impl From> for SendFrame { + fn from(src: Mock) -> Self { Frame::Headers(src.0) } } // Data helpers -pub struct MockData(frame::Data); - -impl MockData { +impl Mock { pub fn eos(mut self) -> Self { self.0.set_end_stream(true); self } } -impl fmt::Debug for MockData { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(&self.0, f) - } -} - -impl From for Frame { - fn from(src: MockData) -> Self { - Frame::Data(src.0) - } -} - -impl From for SendFrame { - fn from(src: MockData) -> Self { +impl From> for SendFrame { + fn from(src: Mock) -> Self { let id = src.0.stream_id(); let eos = src.0.is_end_stream(); let payload = src.0.into_payload(); @@ -145,32 +140,32 @@ impl From for SendFrame { } } - // GoAway helpers -pub struct MockGoAway(frame::GoAway); - -impl MockGoAway { +impl Mock { pub fn flow_control(self) -> Self { - MockGoAway(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FlowControlError)) + Mock(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FlowControlError)) } } -impl fmt::Debug for MockGoAway { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(&self.0, f) - } -} - -impl From for Frame { - fn from(src: MockGoAway) -> Self { +impl From> for SendFrame { + fn from(src: Mock) -> Self { Frame::GoAway(src.0) } } -impl From for SendFrame { - fn from(src: MockGoAway) -> Self { - Frame::GoAway(src.0) +// ==== Reset helpers + +impl Mock { + pub fn flow_control(self) -> Self { + let id = self.0.stream_id(); + Mock(frame::Reset::new(id, frame::Reason::FlowControlError)) + } +} + +impl From> for SendFrame { + fn from(src: Mock) -> Self { + Frame::Reset(src.0) } } diff --git a/tests/support/src/mock.rs b/tests/support/src/mock.rs index dc22a26..67e4731 100644 --- a/tests/support/src/mock.rs +++ b/tests/support/src/mock.rs @@ -326,6 +326,14 @@ pub trait HandleFutureExt { } } + fn ignore_settings(self) -> Box> + where Self: Sized + 'static, + Self: Future, + Self::Error: fmt::Debug, + { + Box::new(self.map(|(_settings, handle)| handle).unwrap()) + } + fn recv_frame(self, frame: T) -> RecvFrame<::Future> where Self: IntoRecvFrame + Sized, T: Into,