Merge pull request #88 from seanmonstar/config-max-frame-size

Configure max frame size
This commit is contained in:
Sean McArthur
2017-09-14 17:29:49 -07:00
committed by GitHub
13 changed files with 221 additions and 18 deletions

View File

@@ -170,6 +170,12 @@ impl Builder {
self self
} }
/// Set the max frame size of received frames.
pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
self.settings.set_max_frame_size(Some(max));
self
}
/// Bind an H2 client connection. /// Bind an H2 client connection.
/// ///
/// Returns a future which resolves to the connection value once the H2 /// Returns a future which resolves to the connection value once the H2
@@ -203,6 +209,10 @@ where
// Create the codec // Create the codec
let mut codec = Codec::new(io); let mut codec = Codec::new(io);
if let Some(max) = self.settings.max_frame_size() {
codec.set_max_recv_frame_size(max as usize);
}
// Send initial settings frame // Send initial settings frame
codec codec
.buffer(self.settings.clone().into()) .buffer(self.settings.clone().into())

View File

@@ -1,6 +1,6 @@
use codec::RecvError; use codec::RecvError;
use frame::{self, Frame, Kind}; use frame::{self, Frame, Kind, Reason};
use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE; use frame::{DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE};
use frame::Reason::*; use frame::Reason::*;
use hpack; use hpack;
@@ -9,6 +9,8 @@ use futures::*;
use bytes::BytesMut; use bytes::BytesMut;
use std::io;
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
use tokio_io::codec::length_delimited; use tokio_io::codec::length_delimited;
@@ -228,9 +230,12 @@ impl<T> FramedRead<T> {
} }
/// Updates the max frame size setting. /// Updates the max frame size setting.
///
/// Must be within 16,384 and 16,777,215.
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
#[inline] #[inline]
pub fn set_max_frame_size(&mut self, val: usize) { pub fn set_max_frame_size(&mut self, val: usize) {
assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize);
self.inner.set_max_frame_length(val) self.inner.set_max_frame_length(val)
} }
} }
@@ -245,7 +250,7 @@ where
fn poll(&mut self) -> Poll<Option<Frame>, Self::Error> { fn poll(&mut self) -> Poll<Option<Frame>, Self::Error> {
loop { loop {
trace!("poll"); trace!("poll");
let bytes = match try_ready!(self.inner.poll()) { let bytes = match try_ready!(self.inner.poll().map_err(map_err)) {
Some(bytes) => bytes, Some(bytes) => bytes,
None => return Ok(Async::Ready(None)), None => return Ok(Async::Ready(None)),
}; };
@@ -258,3 +263,17 @@ where
} }
} }
} }
fn map_err(err: io::Error) -> RecvError {
use std::error::Error;
if let io::ErrorKind::InvalidData = err.kind() {
// woah, brittle...
// TODO: with tokio-io v0.1.4, we can check
// err.get_ref().is::<tokio_io::length_delimited::FrameTooBig>()
if err.description() == "frame size too big" {
return RecvError::Connection(Reason::FrameSizeError);
}
}
err.into()
}

View File

@@ -227,7 +227,7 @@ impl<T, B> FramedWrite<T, B> {
/// Set the peer's max frame size. /// Set the peer's max frame size.
pub fn set_max_frame_size(&mut self, val: usize) { pub fn set_max_frame_size(&mut self, val: usize) {
assert!(val <= frame::MAX_MAX_FRAME_SIZE); assert!(val <= frame::MAX_MAX_FRAME_SIZE as usize);
self.max_frame_size = val as FrameSize; self.max_frame_size = val as FrameSize;
} }

View File

@@ -45,10 +45,12 @@ where
.length_field_length(3) .length_field_length(3)
.length_adjustment(9) .length_adjustment(9)
.num_skip(0) // Don't skip the header .num_skip(0) // Don't skip the header
.max_frame_length(max_frame_size)
.new_read(framed_write); .new_read(framed_write);
let inner = FramedRead::new(delimited); let mut inner = FramedRead::new(delimited);
// Use FramedRead's method since it checks the value is within range.
inner.set_max_frame_size(max_frame_size);
Codec { Codec {
inner, inner,
@@ -66,7 +68,6 @@ impl<T, B> Codec<T, B> {
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
#[inline] #[inline]
pub fn set_max_recv_frame_size(&mut self, val: usize) { pub fn set_max_recv_frame_size(&mut self, val: usize) {
// TODO: should probably make some assertions about max frame size...
self.inner.set_max_frame_size(val) self.inner.set_max_frame_size(val)
} }

View File

@@ -33,7 +33,7 @@ impl Reason {
FlowControlError => "flow-control protocol violated", FlowControlError => "flow-control protocol violated",
SettingsTimeout => "settings ACK not received in timely manner", SettingsTimeout => "settings ACK not received in timely manner",
StreamClosed => "received frame when stream half-closed", StreamClosed => "received frame when stream half-closed",
FrameSizeError => "frame sent with invalid size", FrameSizeError => "frame with invalid size",
RefusedStream => "refused stream before processing any application logic", RefusedStream => "refused stream before processing any application logic",
Cancel => "stream no longer needed", Cancel => "stream no longer needed",
CompressionError => "unable to maintain the header compression context", CompressionError => "unable to maintain the header compression context",

View File

@@ -46,7 +46,7 @@ pub const DEFAULT_MAX_FRAME_SIZE: FrameSize = 16_384;
pub const MAX_INITIAL_WINDOW_SIZE: usize = (1 << 31) - 1; pub const MAX_INITIAL_WINDOW_SIZE: usize = (1 << 31) - 1;
/// MAX_FRAME_SIZE upper bound /// MAX_FRAME_SIZE upper bound
pub const MAX_MAX_FRAME_SIZE: usize = (1 << 24) - 1; pub const MAX_MAX_FRAME_SIZE: FrameSize = (1 << 24) - 1;
// ===== impl Settings ===== // ===== impl Settings =====
@@ -78,6 +78,13 @@ impl Settings {
self.max_frame_size self.max_frame_size
} }
pub fn set_max_frame_size(&mut self, size: Option<u32>) {
if let Some(val) = size {
assert!(DEFAULT_MAX_FRAME_SIZE <= val && val <= MAX_MAX_FRAME_SIZE);
}
self.max_frame_size = size;
}
pub fn load(head: Head, payload: &[u8]) -> Result<Settings, Error> { pub fn load(head: Head, payload: &[u8]) -> Result<Settings, Error> {
use self::Setting::*; use self::Setting::*;
@@ -131,7 +138,7 @@ impl Settings {
settings.initial_window_size = Some(val); settings.initial_window_size = Some(val);
}, },
Some(MaxFrameSize(val)) => { Some(MaxFrameSize(val)) => {
if val < DEFAULT_MAX_FRAME_SIZE || val as usize > MAX_MAX_FRAME_SIZE { if val < DEFAULT_MAX_FRAME_SIZE || val > MAX_MAX_FRAME_SIZE {
return Err(Error::InvalidSettingValue); return Err(Error::InvalidSettingValue);
} else { } else {
settings.max_frame_size = Some(val); settings.max_frame_size = Some(val);

View File

@@ -93,6 +93,10 @@ where
// Create the codec // Create the codec
let mut codec = Codec::new(io); let mut codec = Codec::new(io);
if let Some(max) = settings.max_frame_size() {
codec.set_max_recv_frame_size(max as usize);
}
// Send initial settings frame // Send initial settings frame
codec codec
.buffer(settings.clone().into()) .buffer(settings.clone().into())
@@ -180,13 +184,20 @@ impl Builder {
self self
} }
/// Set the max frame size of received frames.
pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
self.settings.set_max_frame_size(Some(max));
self
}
/// Bind an H2 server connection. /// Bind an H2 server connection.
/// ///
/// Returns a future which resolves to the connection value once the H2 /// Returns a future which resolves to the connection value once the H2
/// handshake has been completed. /// handshake has been completed.
pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B> pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
where T: AsyncRead + AsyncWrite + 'static, where
B: IntoBuf + 'static T: AsyncRead + AsyncWrite + 'static,
B: IntoBuf + 'static,
{ {
Server::handshake2(io, self.settings.clone()) Server::handshake2(io, self.settings.clone())
} }

View File

@@ -110,22 +110,24 @@ fn read_headers_empty_payload() {}
#[test] #[test]
fn update_max_frame_len_at_rest() { fn update_max_frame_len_at_rest() {
let _ = ::env_logger::init();
// TODO: add test for updating max frame length in flight as well? // TODO: add test for updating max frame length in flight as well?
let mut codec = raw_codec! { let mut codec = raw_codec! {
read => [ read => [
0, 0, 5, 0, 0, 0, 0, 0, 1, 0, 0, 5, 0, 0, 0, 0, 0, 1,
"hello", "hello",
"world", 0, 64, 1, 0, 0, 0, 0, 0, 1,
vec![0; 16_385],
]; ];
}; };
assert_eq!(poll_data!(codec).payload(), &b"hello"[..]); assert_eq!(poll_data!(codec).payload(), &b"hello"[..]);
codec.set_max_recv_frame_size(2); codec.set_max_recv_frame_size(16_384);
assert_eq!(codec.max_recv_frame_size(), 2); assert_eq!(codec.max_recv_frame_size(), 16_384);
assert_eq!( assert_eq!(
codec.poll().unwrap_err().description(), codec.poll().unwrap_err().description(),
"frame size too big" "frame with invalid size"
); );
} }

View File

@@ -195,6 +195,110 @@ fn closed_streams_are_released() {
let _ = h2.join(srv).wait().unwrap(); let _ = h2.join(srv).wait().unwrap();
} }
#[test]
fn errors_if_recv_frame_exceeds_max_frame_size() {
let _ = ::env_logger::init();
let (io, mut srv) = mock::new();
let h2 = Client::handshake(io).unwrap().and_then(|mut h2| {
let request = Request::builder()
.method(Method::GET)
.uri("https://example.com/")
.body(())
.unwrap();
let req = h2.request(request, true)
.unwrap()
.unwrap()
.and_then(|resp| {
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_parts().1;
body.concat2().then(|res| {
let err = res.unwrap_err();
assert_eq!(err.to_string(), "protocol error: frame with invalid size");
Ok::<(), ()>(())
})
});
// client should see a conn error
let conn = h2.then(|res| {
let err = res.unwrap_err();
assert_eq!(err.to_string(), "protocol error: frame with invalid size");
Ok::<(), ()>(())
});
conn.unwrap().join(req)
});
// a bad peer
srv.codec_mut().set_max_send_frame_size(16_384 * 4);
let srv = srv.assert_client_handshake()
.unwrap()
.ignore_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://example.com/")
.eos(),
)
.send_frame(frames::headers(1).response(200))
.send_frame(frames::data(1, vec![0; 16_385]).eos())
.recv_frame(frames::go_away(0).frame_size())
.close();
let _ = h2.join(srv).wait().unwrap();
}
#[test]
fn configure_max_frame_size() {
let _ = ::env_logger::init();
let (io, mut srv) = mock::new();
let h2 = Client::builder()
.max_frame_size(16_384 * 2)
.handshake::<_, Bytes>(io)
.expect("handshake")
.and_then(|mut h2| {
let request = Request::builder()
.method(Method::GET)
.uri("https://example.com/")
.body(())
.unwrap();
let req = h2.request(request, true)
.unwrap()
.expect("response")
.and_then(|resp| {
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_parts().1;
body.concat2().expect("body")
})
.and_then(|buf| {
assert_eq!(buf.len(), 16_385);
Ok(())
});
h2.expect("client").join(req)
});
// a good peer
srv.codec_mut().set_max_send_frame_size(16_384 * 2);
let srv = srv.assert_client_handshake()
.unwrap()
.ignore_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://example.com/")
.eos(),
)
.send_frame(frames::headers(1).response(200))
.send_frame(frames::data(1, vec![0; 16_385]).eos())
.close();
let _ = h2.join(srv).wait().expect("wait");
}
/* /*
#[test] #[test]
fn send_data_after_headers_eos() { fn send_data_after_headers_eos() {

View File

@@ -146,6 +146,10 @@ impl Mock<frame::GoAway> {
pub fn flow_control(self) -> Self { pub fn flow_control(self) -> Self {
Mock(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FlowControlError)) Mock(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FlowControlError))
} }
pub fn frame_size(self) -> Self {
Mock(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FrameSizeError))
}
} }
impl From<Mock<frame::GoAway>> for SendFrame { impl From<Mock<frame::GoAway>> for SendFrame {

View File

@@ -12,6 +12,18 @@ pub trait FutureExt: Future {
Unwrap { inner: self } Unwrap { inner: self }
} }
/// Panic on error, with a message.
fn expect<T>(self, msg: T) -> Expect<Self>
where Self: Sized,
Self::Error: fmt::Debug,
T: fmt::Display,
{
Expect {
inner: self,
msg: msg.to_string(),
}
}
/// Drive `other` by polling `self`. /// Drive `other` by polling `self`.
/// ///
/// `self` must not resolve before `other` does. /// `self` must not resolve before `other` does.
@@ -51,6 +63,28 @@ impl<T> Future for Unwrap<T>
} }
} }
// ===== Expect ======
/// Panic on error
pub struct Expect<T> {
inner: T,
msg: String,
}
impl<T> Future for Expect<T>
where T: Future,
T::Item: fmt::Debug,
T::Error: fmt::Debug,
{
type Item = T::Item;
type Error = ();
fn poll(&mut self) -> Poll<T::Item, ()> {
Ok(self.inner.poll().expect(&self.msg))
}
}
// ===== Drive ====== // ===== Drive ======
/// Drive a future to completion while also polling the driver /// Drive a future to completion while also polling the driver

View File

@@ -26,7 +26,7 @@ pub struct Handle {
} }
#[derive(Debug)] #[derive(Debug)]
struct Pipe { pub struct Pipe {
inner: Arc<Mutex<Inner>>, inner: Arc<Mutex<Inner>>,
} }
@@ -67,6 +67,11 @@ pub fn new() -> (Mock, Handle) {
// ===== impl Handle ===== // ===== impl Handle =====
impl Handle { impl Handle {
/// Get a mutable reference to inner Codec.
pub fn codec_mut(&mut self) -> &mut ::Codec<Pipe> {
&mut self.codec
}
/// Send a frame /// Send a frame
pub fn send(&mut self, item: SendFrame) -> Result<(), SendError> { pub fn send(&mut self, item: SendFrame) -> Result<(), SendError> {
// Queue the frame // Queue the frame
@@ -237,7 +242,7 @@ impl io::Write for Mock {
let mut me = self.pipe.inner.lock().unwrap(); let mut me = self.pipe.inner.lock().unwrap();
if me.closed { if me.closed {
return Err(io::ErrorKind::BrokenPipe.into()); return Err(io::Error::new(io::ErrorKind::BrokenPipe, "mock closed"));
} }
me.tx.extend(buf); me.tx.extend(buf);

View File

@@ -44,3 +44,9 @@ impl<'a> Chunk for &'a str {
dst.extend(self.as_bytes()) dst.extend(self.as_bytes())
} }
} }
impl Chunk for Vec<u8> {
fn push(&self, dst: &mut Vec<u8>) {
dst.extend(self.iter())
}
}