Immediately apply initial window size to streams

The initial window size should be applied to streams once they leave the
IDLE state.
This commit is contained in:
Carl Lerche
2017-08-24 11:03:33 -07:00
parent 66dbde92ef
commit 6a6c9665cd
6 changed files with 69 additions and 21 deletions

View File

@@ -162,6 +162,11 @@ impl<B> Prioritize<B>
stream: &mut store::Ptr<B>) stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
// Ignore window updates when the stream is not active.
if !stream.state.could_send_data() {
return Ok(());
}
// Update the stream level flow control. // Update the stream level flow control.
stream.send_flow.inc_window(inc)?; stream.send_flow.inc_window(inc)?;
@@ -219,6 +224,8 @@ impl<B> Prioritize<B>
return; return;
} }
// If the stream has requested capacity, then it must be in the
// streaming state.
debug_assert!(stream.state.is_send_streaming()); debug_assert!(stream.state.is_send_streaming());
// The amount of currently available capacity on the connection // The amount of currently available capacity on the connection

View File

@@ -76,6 +76,11 @@ impl<B> Recv<B> where B: Buf {
} }
} }
/// Returns the initial receive window size
pub fn init_window_sz(&self) -> WindowSize {
self.init_window_sz
}
/// Returns the ID of the last processed stream /// Returns the ID of the last processed stream
pub fn last_processed_id(&self) -> StreamId { pub fn last_processed_id(&self) -> StreamId {
self.last_processed_id self.last_processed_id
@@ -85,7 +90,7 @@ impl<B> Recv<B> where B: Buf {
/// ///
/// Returns the stream state if successful. `None` if refused /// Returns the stream state if successful. `None` if refused
pub fn open<P: Peer>(&mut self, id: StreamId) pub fn open<P: Peer>(&mut self, id: StreamId)
-> Result<Option<Stream<B>>, ConnectionError> -> Result<Option<StreamId>, ConnectionError>
{ {
assert!(self.refused.is_none()); assert!(self.refused.is_none());
@@ -96,7 +101,7 @@ impl<B> Recv<B> where B: Buf {
return Ok(None); return Ok(None);
} }
Ok(Some(Stream::new(id))) Ok(Some(id))
} }
pub fn take_request(&mut self, stream: &mut store::Ptr<B>) pub fn take_request(&mut self, stream: &mut store::Ptr<B>)
@@ -141,11 +146,6 @@ impl<B> Recv<B> where B: Buf {
trace!("opening stream; init_window={}", self.init_window_sz); trace!("opening stream; init_window={}", self.init_window_sz);
let is_initial = stream.state.recv_open(frame.is_end_stream())?; let is_initial = stream.state.recv_open(frame.is_end_stream())?;
if stream.state.is_recv_streaming() {
stream.recv_flow.inc_window(self.init_window_sz)?;
stream.recv_flow.assign_capacity(self.init_window_sz);
}
if is_initial { if is_initial {
if !self.can_inc_num_streams() { if !self.can_inc_num_streams() {
unimplemented!(); unimplemented!();
@@ -285,6 +285,7 @@ impl<B> Recv<B> where B: Buf {
pub fn recv_push_promise<P: Peer>(&mut self, pub fn recv_push_promise<P: Peer>(&mut self,
frame: frame::PushPromise, frame: frame::PushPromise,
send: &Send<B>,
stream: store::Key, stream: store::Key,
store: &mut Store<B>) store: &mut Store<B>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
@@ -309,7 +310,11 @@ impl<B> Recv<B> where B: Buf {
// TODO: All earlier stream IDs should be implicitly closed. // TODO: All earlier stream IDs should be implicitly closed.
// Now, create a new entry for the stream // Now, create a new entry for the stream
let mut new_stream = Stream::new(frame.promised_id()); let mut new_stream = Stream::new(
frame.promised_id(),
send.init_window_sz(),
self.init_window_sz);
new_stream.state.reserve_remote(); new_stream.state.reserve_remote();
let mut ppp = store[stream].pending_push_promises.take(); let mut ppp = store[stream].pending_push_promises.take();

View File

@@ -47,6 +47,11 @@ impl<B> Send<B> where B: Buf {
} }
} }
/// Returns the initial send window size
pub fn init_window_sz(&self) -> WindowSize {
self.init_window_sz
}
pub fn poll_open_ready<P: Peer>(&mut self) -> Poll<(), ConnectionError> { pub fn poll_open_ready<P: Peer>(&mut self) -> Poll<(), ConnectionError> {
try!(self.ensure_can_open::<P>()); try!(self.ensure_can_open::<P>());
@@ -64,7 +69,7 @@ impl<B> Send<B> where B: Buf {
/// ///
/// Returns the stream state if successful. `None` if refused /// Returns the stream state if successful. `None` if refused
pub fn open<P: Peer>(&mut self) pub fn open<P: Peer>(&mut self)
-> Result<Stream<B>, ConnectionError> -> Result<StreamId, ConnectionError>
{ {
try!(self.ensure_can_open::<P>()); try!(self.ensure_can_open::<P>());
@@ -74,7 +79,7 @@ impl<B> Send<B> where B: Buf {
} }
} }
let ret = Stream::new(self.next_stream_id); let ret = self.next_stream_id;
// Increment the number of locally initiated streams // Increment the number of locally initiated streams
self.num_streams += 1; self.num_streams += 1;
@@ -93,10 +98,6 @@ impl<B> Send<B> where B: Buf {
// Update the state // Update the state
stream.state.send_open(frame.is_end_stream())?; stream.state.send_open(frame.is_end_stream())?;
if stream.state.is_send_streaming() {
stream.send_flow.inc_window(self.init_window_sz)?;
}
// Queue the frame for sending // Queue the frame for sending
self.prioritize.queue_frame(frame.into(), stream, task); self.prioritize.queue_frame(frame.into(), stream, task);

View File

@@ -266,6 +266,16 @@ impl State {
} }
} }
/// Returns true if the stream is in a state such that it could send data in
/// the future.
pub fn could_send_data(&self) -> bool {
match self.inner {
Open { .. } => true,
HalfClosedRemote(_) => true,
_ => false,
}
}
pub fn is_send_streaming(&self) -> bool { pub fn is_send_streaming(&self) -> bool {
match self.inner { match self.inner {
Open { local: Peer::Streaming, .. } => true, Open { local: Peer::Streaming, .. } => true,

View File

@@ -83,8 +83,20 @@ pub(super) struct NextSendCapacity;
pub(super) struct NextWindowUpdate; pub(super) struct NextWindowUpdate;
impl<B> Stream<B> { impl<B> Stream<B> {
pub fn new(id: StreamId) -> Stream<B> pub fn new(id: StreamId,
init_send_window: WindowSize,
init_recv_window: WindowSize) -> Stream<B>
{ {
let mut send_flow = FlowControl::new();
let mut recv_flow = FlowControl::new();
recv_flow.inc_window(init_recv_window)
.ok().expect("invalid initial receive window");
recv_flow.assign_capacity(init_recv_window);
send_flow.inc_window(init_send_window)
.ok().expect("invalid initial send window size");
Stream { Stream {
id, id,
state: State::default(), state: State::default(),
@@ -93,7 +105,7 @@ impl<B> Stream<B> {
next_pending_send: None, next_pending_send: None,
is_pending_send: false, is_pending_send: false,
send_flow: FlowControl::new(), send_flow: send_flow,
requested_send_capacity: 0, requested_send_capacity: 0,
buffered_send_data: 0, buffered_send_data: 0,
send_task: None, send_task: None,
@@ -106,7 +118,7 @@ impl<B> Stream<B> {
next_pending_accept: None, next_pending_accept: None,
is_pending_accept: false, is_pending_accept: false,
recv_flow: FlowControl::new(), recv_flow: recv_flow,
in_flight_recv_data: 0, in_flight_recv_data: 0,
next_window_update: None, next_window_update: None,
is_pending_window_update: false, is_pending_window_update: false,

View File

@@ -75,7 +75,14 @@ impl<B> Streams<B>
} }
match try!(me.actions.recv.open::<P>(id)) { match try!(me.actions.recv.open::<P>(id)) {
Some(stream) => e.insert(stream), Some(stream_id) => {
let stream = Stream::new(
stream_id,
me.actions.send.init_window_sz(),
me.actions.recv.init_window_sz());
e.insert(stream)
}
None => return Ok(()), None => return Ok(()),
} }
} }
@@ -195,7 +202,8 @@ impl<B> Streams<B>
None => return Err(ProtocolError.into()), None => return Err(ProtocolError.into()),
}; };
me.actions.recv.recv_push_promise::<P>(frame, stream, &mut me.store) me.actions.recv.recv_push_promise::<P>(
frame, &me.actions.send, stream, &mut me.store)
} }
pub fn next_incoming(&mut self) -> Option<StreamRef<B>> { pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
@@ -273,11 +281,16 @@ impl<B> Streams<B>
let me = &mut *me; let me = &mut *me;
// Initialize a new stream. This fails if the connection is at capacity. // Initialize a new stream. This fails if the connection is at capacity.
let mut stream = me.actions.send.open::<client::Peer>()?; let stream_id = me.actions.send.open::<client::Peer>()?;
let stream = Stream::new(
stream_id,
me.actions.send.init_window_sz(),
me.actions.recv.init_window_sz());
// Convert the message // Convert the message
let headers = client::Peer::convert_send_message( let headers = client::Peer::convert_send_message(
stream.id, request, end_of_stream); stream_id, request, end_of_stream);
let mut stream = me.store.insert(stream.id, stream); let mut stream = me.store.insert(stream.id, stream);