Flow control bug fix (#70)
The requested capacity was not decreased as data is written.
This commit is contained in:
@@ -144,6 +144,12 @@ impl<B, P> Prioritize<B, P>
|
|||||||
|
|
||||||
/// Request capacity to send data
|
/// Request capacity to send data
|
||||||
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr<B, P>) {
|
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr<B, P>) {
|
||||||
|
trace!("reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
|
||||||
|
stream.id,
|
||||||
|
capacity,
|
||||||
|
capacity + stream.buffered_send_data,
|
||||||
|
stream.requested_send_capacity);
|
||||||
|
|
||||||
// Actual capacity is `capacity` + the current amount of buffered data.
|
// Actual capacity is `capacity` + the current amount of buffered data.
|
||||||
// It it were less, then we could never send out the buffered data.
|
// It it were less, then we could never send out the buffered data.
|
||||||
let capacity = capacity + stream.buffered_send_data;
|
let capacity = capacity + stream.buffered_send_data;
|
||||||
@@ -242,8 +248,12 @@ impl<B, P> Prioritize<B, P>
|
|||||||
total_requested - stream.send_flow.available(),
|
total_requested - stream.send_flow.available(),
|
||||||
stream.send_flow.window_size());
|
stream.send_flow.window_size());
|
||||||
|
|
||||||
trace!("try_assign_capacity; requested={}; additional={}; window={}; conn={}",
|
trace!("try_assign_capacity; requested={}; additional={}; buffered={}; window={}; conn={}",
|
||||||
total_requested, additional, stream.send_flow.window_size(), self.flow.available());
|
total_requested,
|
||||||
|
additional,
|
||||||
|
stream.buffered_send_data,
|
||||||
|
stream.send_flow.window_size(),
|
||||||
|
self.flow.available());
|
||||||
|
|
||||||
if additional == 0 {
|
if additional == 0 {
|
||||||
// Nothing more to do
|
// Nothing more to do
|
||||||
@@ -296,7 +306,18 @@ impl<B, P> Prioritize<B, P>
|
|||||||
// If data is buffered, then schedule the stream for execution
|
// If data is buffered, then schedule the stream for execution
|
||||||
if stream.buffered_send_data > 0 {
|
if stream.buffered_send_data > 0 {
|
||||||
debug_assert!(stream.send_flow.available() > 0);
|
debug_assert!(stream.send_flow.available() > 0);
|
||||||
debug_assert!(!stream.pending_send.is_empty());
|
|
||||||
|
// TODO: This assertion isn't *exactly* correct. There can still be
|
||||||
|
// buffered send data while the stream's pending send queue is
|
||||||
|
// empty. This can happen when a large data frame is in the process
|
||||||
|
// of being **partially** sent. Once the window has been sent, the
|
||||||
|
// data frame will be returned to the prioritization layer to be
|
||||||
|
// re-scheduled.
|
||||||
|
//
|
||||||
|
// That said, it would be nice to figure out how to make this
|
||||||
|
// assertion correctly.
|
||||||
|
//
|
||||||
|
// debug_assert!(!stream.pending_send.is_empty());
|
||||||
|
|
||||||
self.pending_send.push(stream);
|
self.pending_send.push(stream);
|
||||||
}
|
}
|
||||||
@@ -464,32 +485,36 @@ impl<B, P> Prioritize<B, P>
|
|||||||
let len = cmp::min(sz, max_len);
|
let len = cmp::min(sz, max_len);
|
||||||
|
|
||||||
// Only send up to the stream's window capacity
|
// Only send up to the stream's window capacity
|
||||||
let len = cmp::min(len, stream_capacity as usize);
|
let len = cmp::min(len, stream_capacity as usize) as WindowSize;
|
||||||
|
|
||||||
// There *must* be be enough connection level
|
// There *must* be be enough connection level
|
||||||
// capacity at this point.
|
// capacity at this point.
|
||||||
debug_assert!(len <= self.flow.window_size() as usize);
|
debug_assert!(len <= self.flow.window_size());
|
||||||
|
|
||||||
|
trace!(" --> sending data frame; len={}", len);
|
||||||
|
|
||||||
// Update the flow control
|
// Update the flow control
|
||||||
trace!(" -- updating stream flow --");
|
trace!(" -- updating stream flow --");
|
||||||
stream.send_flow.send_data(len as WindowSize);
|
stream.send_flow.send_data(len);
|
||||||
|
|
||||||
// Decrement the stream's buffered data counter
|
// Decrement the stream's buffered data counter
|
||||||
debug_assert!(stream.buffered_send_data >= len as u32);
|
debug_assert!(stream.buffered_send_data >= len);
|
||||||
stream.buffered_send_data -= len as u32;
|
stream.buffered_send_data -= len;
|
||||||
|
stream.requested_send_capacity -= len;
|
||||||
|
|
||||||
// Assign the capacity back to the connection that
|
// Assign the capacity back to the connection that
|
||||||
// was just consumed from the stream in the previous
|
// was just consumed from the stream in the previous
|
||||||
// line.
|
// line.
|
||||||
self.flow.assign_capacity(len as WindowSize);
|
self.flow.assign_capacity(len);
|
||||||
|
|
||||||
trace!(" -- updating connection flow --");
|
trace!(" -- updating connection flow --");
|
||||||
self.flow.send_data(len as WindowSize);
|
self.flow.send_data(len);
|
||||||
|
|
||||||
// Wrap the frame's data payload to ensure that the
|
// Wrap the frame's data payload to ensure that the
|
||||||
// correct amount of data gets written.
|
// correct amount of data gets written.
|
||||||
|
|
||||||
let eos = frame.is_end_stream();
|
let eos = frame.is_end_stream();
|
||||||
|
let len = len as usize;
|
||||||
|
|
||||||
if frame.payload().remaining() > len {
|
if frame.payload().remaining() > len {
|
||||||
frame.set_end_stream(false);
|
frame.set_end_stream(false);
|
||||||
|
|||||||
@@ -402,3 +402,92 @@ fn stream_close_by_send_reset_frame_releases_capacity() {
|
|||||||
#[ignore]
|
#[ignore]
|
||||||
fn stream_close_by_recv_reset_frame_releases_capacity() {
|
fn stream_close_by_recv_reset_frame_releases_capacity() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use futures::{Async, Poll};
|
||||||
|
|
||||||
|
struct GetResponse {
|
||||||
|
stream: Option<client::Stream<Bytes>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for GetResponse {
|
||||||
|
type Item = (Response<client::Body<Bytes>>, client::Stream<Bytes>);
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
let response = match self.stream.as_mut().unwrap().poll_response() {
|
||||||
|
Ok(Async::Ready(v)) => v,
|
||||||
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
|
Err(e) => panic!("unexpected error; {:?}", e),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Async::Ready((response, self.stream.take().unwrap())))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn recv_window_update_on_stream_closed_by_data_frame() {
|
||||||
|
let _ = ::env_logger::init();
|
||||||
|
let (m, mock) = mock::new();
|
||||||
|
|
||||||
|
let h2 = Client::handshake(m).unwrap()
|
||||||
|
.and_then(|mut h2| {
|
||||||
|
let request = Request::builder()
|
||||||
|
.method(Method::POST)
|
||||||
|
.uri("https://http2.akamai.com/")
|
||||||
|
.body(()).unwrap();
|
||||||
|
|
||||||
|
let stream = h2.request(request, false).unwrap();
|
||||||
|
|
||||||
|
// Wait for the response
|
||||||
|
h2.drive(GetResponse {
|
||||||
|
stream: Some(stream),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.and_then(|(h2, (response, mut stream))| {
|
||||||
|
assert_eq!(response.status(), StatusCode::OK);
|
||||||
|
|
||||||
|
// Send a data frame, this will also close the connection
|
||||||
|
stream.send_data("hello".into(), true).unwrap();
|
||||||
|
|
||||||
|
// Wait for the connection to close
|
||||||
|
h2.unwrap()
|
||||||
|
})
|
||||||
|
;
|
||||||
|
|
||||||
|
let mock = mock.assert_client_handshake().unwrap()
|
||||||
|
// Get the first frame
|
||||||
|
.and_then(|(_, mock)| mock.into_future().unwrap())
|
||||||
|
.and_then(|(frame, mut mock)| {
|
||||||
|
let request = assert_headers!(frame.unwrap());
|
||||||
|
|
||||||
|
assert_eq!(request.stream_id(), 1);
|
||||||
|
assert!(!request.is_end_stream());
|
||||||
|
|
||||||
|
// Send the response which also closes the stream
|
||||||
|
let mut f = frame::Headers::new(
|
||||||
|
request.stream_id(),
|
||||||
|
frame::Pseudo::response(StatusCode::OK),
|
||||||
|
HeaderMap::new());
|
||||||
|
f.set_end_stream();
|
||||||
|
|
||||||
|
mock.send(f.into()).unwrap();
|
||||||
|
|
||||||
|
mock.into_future().unwrap()
|
||||||
|
})
|
||||||
|
.and_then(|(frame, mut mock)| {
|
||||||
|
let data = assert_data!(frame.unwrap());
|
||||||
|
assert_eq!(data.payload(), "hello");
|
||||||
|
|
||||||
|
// Send a window update just for fun
|
||||||
|
let f = frame::WindowUpdate::new(
|
||||||
|
data.stream_id(), data.payload().len() as u32);
|
||||||
|
|
||||||
|
mock.send(f.into()).unwrap();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
;
|
||||||
|
|
||||||
|
let _ = h2.join(mock)
|
||||||
|
.wait().unwrap();
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user