diff --git a/src/client.rs b/src/client.rs index c0c58ee..89ac90f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -76,7 +76,7 @@ impl Client /// Returns `Ready` when the connection can initialize a new HTTP 2.0 /// stream. pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - unimplemented!(); + self.connection.poll_send_request_ready() } /// Send a request on a new HTTP 2.0 stream diff --git a/src/proto/connection.rs b/src/proto/connection.rs index b06eb2c..adb800c 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -45,7 +45,7 @@ impl Connection } /// Returns `Ready` when the connection is ready to receive a frame. - pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { // The order of these calls don't really matter too much as only one // should have pending work. try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); @@ -55,6 +55,11 @@ impl Connection Ok(().into()) } + /// Returns `Ready` when new the connection is able to support a new request stream. + pub fn poll_send_request_ready(&mut self) -> Poll<(), ConnectionError> { + self.streams.poll_send_request_ready() + } + /// Advances the internal state of the connection. pub fn poll(&mut self) -> Poll<(), ConnectionError> { match self.poll2() { diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index e9ae6c9..250be77 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -27,19 +27,17 @@ pub(super) struct Send { /// List of streams waiting for outbound connection capacity pending_capacity: store::List, + /// Task awaiting notification to open a new stream. + blocked_open: Option, + /// Prioritization layer prioritize: Prioritize, } impl Send where B: Buf { - /// Create a new `Send` pub fn new(config: &Config) -> Self { - let next_stream_id = if P::is_server() { - 2 - } else { - 1 - }; + let next_stream_id = if P::is_server() { 2 } else { 1 }; Send { max_streams: config.max_local_initiated, @@ -47,10 +45,24 @@ impl Send where B: Buf { next_stream_id: next_stream_id.into(), init_window_sz: config.init_local_window_sz, pending_capacity: store::List::new(), + blocked_open: None, prioritize: Prioritize::new(config), } } + pub fn poll_open_ready(&mut self) -> Poll<(), ConnectionError> { + try!(self.ensure_can_open::

()); + + if let Some(max) = self.max_streams { + if max <= self.num_streams { + self.blocked_open = Some(task::current()); + return Ok(Async::NotReady); + } + } + + return Ok(Async::Ready(())); + } + /// Update state reflecting a new, locally opened stream /// /// Returns the stream state if successful. `None` if refused @@ -129,9 +141,9 @@ impl Send where B: Buf { } if stream.state.is_closed() { - return Err(InactiveStreamId.into()) + return Err(InactiveStreamId.into()); } else { - return Err(UnexpectedFrameType.into()) + return Err(UnexpectedFrameType.into()); } } @@ -275,7 +287,8 @@ impl Send where B: Buf { pub fn apply_remote_settings(&mut self, settings: &frame::Settings, - store: &mut Store) { + store: &mut Store) + { if let Some(val) = settings.max_concurrent_streams() { self.max_streams = Some(val as usize); } @@ -342,6 +355,12 @@ impl Send where B: Buf { pub fn dec_num_streams(&mut self) { self.num_streams -= 1; + + if self.num_streams < self.max_streams.unwrap_or(::std::usize::MAX) { + if let Some(task) = self.blocked_open.take() { + task.notify(); + } + } } /// Returns true if the local actor can initiate a stream with the given ID. diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index ff3a0b7..29b31d9 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -5,8 +5,6 @@ use super::*; use std::marker::PhantomData; use std::sync::{Arc, Mutex}; -// TODO: All the VecDeques should become linked lists using the State -// values. #[derive(Debug)] pub(crate) struct Streams { inner: Arc>>, @@ -256,11 +254,14 @@ impl Streams me.actions.send.apply_remote_settings(frame, &mut me.store); } -} -impl Streams - where B: Buf, -{ + pub fn poll_send_request_ready(&mut self) -> Poll<(), ConnectionError> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions.send.poll_open_ready::() + } + pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) -> Result, ConnectionError> {