add test when stream window overflows before conn window

This commit is contained in:
Sean McArthur
2017-09-11 15:05:52 -07:00
parent e2cda1860b
commit 3ec0e85e56
6 changed files with 151 additions and 77 deletions

View File

@@ -48,7 +48,6 @@ impl<T> Client<T, Bytes>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
{ {
/// Bind an H2 client connection. /// Bind an H2 client connection.
/// ///
/// Returns a future which resolves to the connection value once the H2 /// Returns a future which resolves to the connection value once the H2
@@ -69,8 +68,9 @@ impl Client<(), Bytes> {
} }
impl<T, B> Client<T, B> impl<T, B> Client<T, B>
where T: AsyncRead + AsyncWrite, where
B: IntoBuf T: AsyncRead + AsyncWrite,
B: IntoBuf,
{ {
fn handshake2(io: T, settings: Settings) -> Handshake<T, B> { fn handshake2(io: T, settings: Settings) -> Handshake<T, B> {
use tokio_io::io; use tokio_io::io;
@@ -78,8 +78,7 @@ where T: AsyncRead + AsyncWrite,
debug!("binding client connection"); debug!("binding client connection");
let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
let handshake = io::write_all(io, msg) let handshake = io::write_all(io, msg).map_err(::Error::from as _);
.map_err(::Error::from as _);
Handshake { Handshake {
inner: handshake, inner: handshake,
@@ -179,8 +178,9 @@ impl Builder {
/// It's important to note that this does not **flush** the outbound /// It's important to note that this does not **flush** the outbound
/// settings to the wire. /// settings to the wire.
pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B> pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
where T: AsyncRead + AsyncWrite, where
B: IntoBuf T: AsyncRead + AsyncWrite,
B: IntoBuf,
{ {
Client::handshake2(io, self.settings.clone()) Client::handshake2(io, self.settings.clone())
} }
@@ -204,7 +204,8 @@ where
let mut codec = Codec::new(io); let mut codec = Codec::new(io);
// Send initial settings frame // Send initial settings frame
codec.buffer(self.settings.clone().into()) codec
.buffer(self.settings.clone().into())
.expect("invalid SETTINGS frame"); .expect("invalid SETTINGS frame");
let connection = Connection::new(codec, &self.settings); let connection = Connection::new(codec, &self.settings);

View File

@@ -58,7 +58,10 @@ where
P: Peer, P: Peer,
B: IntoBuf, B: IntoBuf,
{ {
pub fn new(codec: Codec<T, Prioritized<B::Buf>>, settings: &frame::Settings) -> Connection<T, P, B> { pub fn new(
codec: Codec<T, Prioritized<B::Buf>>,
settings: &frame::Settings,
) -> Connection<T, P, B> {
// TODO: Actually configure // TODO: Actually configure
let streams = Streams::new(streams::Config { let streams = Streams::new(streams::Config {
max_remote_initiated: None, max_remote_initiated: None,
@@ -68,7 +71,6 @@ where
.initial_window_size() .initial_window_size()
.unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE), .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE),
}); });
Connection { Connection {
state: State::Open, state: State::Open,
codec: codec, codec: codec,

View File

@@ -225,8 +225,14 @@ fn recv_data_overflows_connection_window() {
.and_then(|resp| { .and_then(|resp| {
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_parts().1; let body = resp.into_parts().1;
// FIXME: body stream should error also body.concat2().then(|res| {
body.concat2().unwrap() let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: flow-control protocol violated"
);
Ok::<(), ()>(())
})
}); });
// client should see a flow control error // client should see a flow control error
@@ -244,11 +250,76 @@ fn recv_data_overflows_connection_window() {
} }
#[test] #[test]
#[ignore]
fn recv_data_overflows_stream_window() { fn recv_data_overflows_stream_window() {
// this tests for when streams have smaller windows than their connection // this tests for when streams have smaller windows than their connection
let _ = ::env_logger::init();
let (io, srv) = mock::new();
let mock = srv.assert_client_handshake().unwrap()
.ignore_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos()
)
.send_frame(
frames::headers(1)
.response(200)
)
// fill the whole window
.send_frame(frames::data(1, vec![0u8; 16_384]))
// this frame overflows the window!
.send_frame(frames::data(1, &[0; 16][..]).eos())
// expecting goaway for the conn
// TODO: change to a RST_STREAM eventually
.recv_frame(frames::go_away(0).flow_control())
// close the connection
.map(drop);
let h2 = Client::builder()
.initial_window_size(16_384)
.handshake::<_, Bytes>(io)
.unwrap()
.and_then(|mut h2| {
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = h2.request(request, true)
.unwrap()
.unwrap()
.and_then(|resp| {
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_parts().1;
body.concat2().then(|res| {
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: flow-control protocol violated"
);
Ok::<(), ()>(())
})
});
// client should see a flow control error
let conn = h2.then(|res| {
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: flow-control protocol violated"
);
Ok::<(), ()>(())
});
conn.unwrap().join(req)
});
h2.join(mock).wait().unwrap();
} }
#[test] #[test]
#[ignore] #[ignore]
fn recv_window_update_causes_overflow() { fn recv_window_update_causes_overflow() {

View File

@@ -62,10 +62,7 @@ fn send_recv_data() {
]) ])
.build(); .build();
let mut h2 = Client::builder() let mut h2 = Client::builder().handshake(mock).wait().unwrap();
.handshake(mock)
.wait()
.unwrap();
let request = Request::builder() let request = Request::builder()
.method(Method::POST) .method(Method::POST)

View File

@@ -11,21 +11,21 @@ pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
// ==== helper functions to easily construct h2 Frames ==== // ==== helper functions to easily construct h2 Frames ====
pub fn headers<T>(id: T) -> MockHeaders pub fn headers<T>(id: T) -> Mock<frame::Headers>
where T: Into<StreamId>, where T: Into<StreamId>,
{ {
MockHeaders(frame::Headers::new( Mock(frame::Headers::new(
id.into(), id.into(),
frame::Pseudo::default(), frame::Pseudo::default(),
HeaderMap::default(), HeaderMap::default(),
)) ))
} }
pub fn data<T, B>(id: T, buf: B) -> MockData pub fn data<T, B>(id: T, buf: B) -> Mock<frame::Data>
where T: Into<StreamId>, where T: Into<StreamId>,
B: Into<Bytes>, B: Into<Bytes>,
{ {
MockData(frame::Data::new(id.into(), buf.into())) Mock(frame::Data::new(id.into(), buf.into()))
} }
pub fn window_update<T>(id: T, sz: u32) -> frame::WindowUpdate pub fn window_update<T>(id: T, sz: u32) -> frame::WindowUpdate
@@ -34,17 +34,38 @@ pub fn window_update<T>(id: T, sz: u32) -> frame::WindowUpdate
frame::WindowUpdate::new(id.into(), sz) frame::WindowUpdate::new(id.into(), sz)
} }
pub fn go_away<T>(id: T) -> MockGoAway pub fn go_away<T>(id: T) -> Mock<frame::GoAway>
where T: Into<StreamId>, where T: Into<StreamId>,
{ {
MockGoAway(frame::GoAway::new(id.into(), frame::Reason::NoError)) Mock(frame::GoAway::new(id.into(), frame::Reason::NoError))
}
pub fn reset<T>(id: T) -> Mock<frame::Reset>
where T: Into<StreamId>,
{
Mock(frame::Reset::new(id.into(), frame::Reason::NoError))
}
// === Generic helpers of all frame types
pub struct Mock<T>(T);
impl<T: fmt::Debug> fmt::Debug for Mock<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}
impl<T> From<Mock<T>> for Frame
where T: Into<Frame> {
fn from(src: Mock<T>) -> Self {
src.0.into()
}
} }
// Headers helpers // Headers helpers
pub struct MockHeaders(frame::Headers); impl Mock<frame::Headers> {
impl MockHeaders {
pub fn request<M, U>(self, method: M, uri: U) -> Self pub fn request<M, U>(self, method: M, uri: U) -> Self
where M: HttpTryInto<http::Method>, where M: HttpTryInto<http::Method>,
U: HttpTryInto<http::Uri>, U: HttpTryInto<http::Uri>,
@@ -57,7 +78,7 @@ impl MockHeaders {
frame::Pseudo::request(method, uri), frame::Pseudo::request(method, uri),
fields fields
); );
MockHeaders(frame) Mock(frame)
} }
pub fn response<S>(self, status: S) -> Self pub fn response<S>(self, status: S) -> Self
@@ -70,13 +91,13 @@ impl MockHeaders {
frame::Pseudo::response(status), frame::Pseudo::response(status),
fields fields
); );
MockHeaders(frame) Mock(frame)
} }
pub fn fields(self, fields: HeaderMap) -> Self { pub fn fields(self, fields: HeaderMap) -> Self {
let (id, pseudo, _) = self.into_parts(); let (id, pseudo, _) = self.into_parts();
let frame = frame::Headers::new(id, pseudo, fields); let frame = frame::Headers::new(id, pseudo, fields);
MockHeaders(frame) Mock(frame)
} }
pub fn eos(mut self) -> Self { pub fn eos(mut self) -> Self {
@@ -93,49 +114,23 @@ impl MockHeaders {
} }
} }
impl fmt::Debug for MockHeaders { impl From<Mock<frame::Headers>> for SendFrame {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn from(src: Mock<frame::Headers>) -> Self {
fmt::Debug::fmt(&self.0, f)
}
}
impl From<MockHeaders> for Frame {
fn from(src: MockHeaders) -> Self {
Frame::Headers(src.0)
}
}
impl From<MockHeaders> for SendFrame {
fn from(src: MockHeaders) -> Self {
Frame::Headers(src.0) Frame::Headers(src.0)
} }
} }
// Data helpers // Data helpers
pub struct MockData(frame::Data); impl Mock<frame::Data> {
impl MockData {
pub fn eos(mut self) -> Self { pub fn eos(mut self) -> Self {
self.0.set_end_stream(true); self.0.set_end_stream(true);
self self
} }
} }
impl fmt::Debug for MockData { impl From<Mock<frame::Data>> for SendFrame {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn from(src: Mock<frame::Data>) -> Self {
fmt::Debug::fmt(&self.0, f)
}
}
impl From<MockData> for Frame {
fn from(src: MockData) -> Self {
Frame::Data(src.0)
}
}
impl From<MockData> for SendFrame {
fn from(src: MockData) -> Self {
let id = src.0.stream_id(); let id = src.0.stream_id();
let eos = src.0.is_end_stream(); let eos = src.0.is_end_stream();
let payload = src.0.into_payload(); let payload = src.0.into_payload();
@@ -145,32 +140,32 @@ impl From<MockData> for SendFrame {
} }
} }
// GoAway helpers // GoAway helpers
pub struct MockGoAway(frame::GoAway); impl Mock<frame::GoAway> {
impl MockGoAway {
pub fn flow_control(self) -> Self { pub fn flow_control(self) -> Self {
MockGoAway(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FlowControlError)) Mock(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FlowControlError))
} }
} }
impl fmt::Debug for MockGoAway { impl From<Mock<frame::GoAway>> for SendFrame {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn from(src: Mock<frame::GoAway>) -> Self {
fmt::Debug::fmt(&self.0, f)
}
}
impl From<MockGoAway> for Frame {
fn from(src: MockGoAway) -> Self {
Frame::GoAway(src.0) Frame::GoAway(src.0)
} }
} }
impl From<MockGoAway> for SendFrame { // ==== Reset helpers
fn from(src: MockGoAway) -> Self {
Frame::GoAway(src.0) impl Mock<frame::Reset> {
pub fn flow_control(self) -> Self {
let id = self.0.stream_id();
Mock(frame::Reset::new(id, frame::Reason::FlowControlError))
}
}
impl From<Mock<frame::Reset>> for SendFrame {
fn from(src: Mock<frame::Reset>) -> Self {
Frame::Reset(src.0)
} }
} }

View File

@@ -326,6 +326,14 @@ pub trait HandleFutureExt {
} }
} }
fn ignore_settings(self) -> Box<Future<Item=Handle, Error=()>>
where Self: Sized + 'static,
Self: Future<Item=(frame::Settings, Handle)>,
Self::Error: fmt::Debug,
{
Box::new(self.map(|(_settings, handle)| handle).unwrap())
}
fn recv_frame<T>(self, frame: T) -> RecvFrame<<Self as IntoRecvFrame>::Future> fn recv_frame<T>(self, frame: T) -> RecvFrame<<Self as IntoRecvFrame>::Future>
where Self: IntoRecvFrame + Sized, where Self: IntoRecvFrame + Sized,
T: Into<Frame>, T: Into<Frame>,