Implement Client::poll_ready (#21)

Client::poll_ready ensures that the connection is ale to to initiate a new request stream to the remote server.  When the server is at capacity, a task is stored to be notified when capacity is available.
This commit is contained in:
Oliver Gould
2017-08-15 13:46:55 -07:00
committed by Carl Lerche
parent 150c3160be
commit e015d7bfba
4 changed files with 42 additions and 17 deletions

View File

@@ -76,7 +76,7 @@ impl<T, B> Client<T, B>
/// Returns `Ready` when the connection can initialize a new HTTP 2.0 /// Returns `Ready` when the connection can initialize a new HTTP 2.0
/// stream. /// stream.
pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
unimplemented!(); self.connection.poll_send_request_ready()
} }
/// Send a request on a new HTTP 2.0 stream /// Send a request on a new HTTP 2.0 stream

View File

@@ -45,7 +45,7 @@ impl<T, P, B> Connection<T, P, B>
} }
/// Returns `Ready` when the connection is ready to receive a frame. /// Returns `Ready` when the connection is ready to receive a frame.
pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
// The order of these calls don't really matter too much as only one // The order of these calls don't really matter too much as only one
// should have pending work. // should have pending work.
try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); try_ready!(self.ping_pong.send_pending_pong(&mut self.codec));
@@ -55,6 +55,11 @@ impl<T, P, B> Connection<T, P, B>
Ok(().into()) Ok(().into())
} }
/// Returns `Ready` when new the connection is able to support a new request stream.
pub fn poll_send_request_ready(&mut self) -> Poll<(), ConnectionError> {
self.streams.poll_send_request_ready()
}
/// Advances the internal state of the connection. /// Advances the internal state of the connection.
pub fn poll(&mut self) -> Poll<(), ConnectionError> { pub fn poll(&mut self) -> Poll<(), ConnectionError> {
match self.poll2() { match self.poll2() {

View File

@@ -27,19 +27,17 @@ pub(super) struct Send<B> {
/// List of streams waiting for outbound connection capacity /// List of streams waiting for outbound connection capacity
pending_capacity: store::List<B>, pending_capacity: store::List<B>,
/// Task awaiting notification to open a new stream.
blocked_open: Option<task::Task>,
/// Prioritization layer /// Prioritization layer
prioritize: Prioritize<B>, prioritize: Prioritize<B>,
} }
impl<B> Send<B> where B: Buf { impl<B> Send<B> where B: Buf {
/// Create a new `Send` /// Create a new `Send`
pub fn new<P: Peer>(config: &Config) -> Self { pub fn new<P: Peer>(config: &Config) -> Self {
let next_stream_id = if P::is_server() { let next_stream_id = if P::is_server() { 2 } else { 1 };
2
} else {
1
};
Send { Send {
max_streams: config.max_local_initiated, max_streams: config.max_local_initiated,
@@ -47,10 +45,24 @@ impl<B> Send<B> where B: Buf {
next_stream_id: next_stream_id.into(), next_stream_id: next_stream_id.into(),
init_window_sz: config.init_local_window_sz, init_window_sz: config.init_local_window_sz,
pending_capacity: store::List::new(), pending_capacity: store::List::new(),
blocked_open: None,
prioritize: Prioritize::new(config), prioritize: Prioritize::new(config),
} }
} }
pub fn poll_open_ready<P: Peer>(&mut self) -> Poll<(), ConnectionError> {
try!(self.ensure_can_open::<P>());
if let Some(max) = self.max_streams {
if max <= self.num_streams {
self.blocked_open = Some(task::current());
return Ok(Async::NotReady);
}
}
return Ok(Async::Ready(()));
}
/// Update state reflecting a new, locally opened stream /// Update state reflecting a new, locally opened stream
/// ///
/// Returns the stream state if successful. `None` if refused /// Returns the stream state if successful. `None` if refused
@@ -129,9 +141,9 @@ impl<B> Send<B> where B: Buf {
} }
if stream.state.is_closed() { if stream.state.is_closed() {
return Err(InactiveStreamId.into()) return Err(InactiveStreamId.into());
} else { } else {
return Err(UnexpectedFrameType.into()) return Err(UnexpectedFrameType.into());
} }
} }
@@ -275,7 +287,8 @@ impl<B> Send<B> where B: Buf {
pub fn apply_remote_settings(&mut self, pub fn apply_remote_settings(&mut self,
settings: &frame::Settings, settings: &frame::Settings,
store: &mut Store<B>) { store: &mut Store<B>)
{
if let Some(val) = settings.max_concurrent_streams() { if let Some(val) = settings.max_concurrent_streams() {
self.max_streams = Some(val as usize); self.max_streams = Some(val as usize);
} }
@@ -342,6 +355,12 @@ impl<B> Send<B> where B: Buf {
pub fn dec_num_streams(&mut self) { pub fn dec_num_streams(&mut self) {
self.num_streams -= 1; self.num_streams -= 1;
if self.num_streams < self.max_streams.unwrap_or(::std::usize::MAX) {
if let Some(task) = self.blocked_open.take() {
task.notify();
}
}
} }
/// Returns true if the local actor can initiate a stream with the given ID. /// Returns true if the local actor can initiate a stream with the given ID.

View File

@@ -5,8 +5,6 @@ use super::*;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
// TODO: All the VecDeques should become linked lists using the State
// values.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Streams<B> { pub(crate) struct Streams<B> {
inner: Arc<Mutex<Inner<B>>>, inner: Arc<Mutex<Inner<B>>>,
@@ -256,11 +254,14 @@ impl<B> Streams<B>
me.actions.send.apply_remote_settings(frame, &mut me.store); me.actions.send.apply_remote_settings(frame, &mut me.store);
} }
pub fn poll_send_request_ready(&mut self) -> Poll<(), ConnectionError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
me.actions.send.poll_open_ready::<client::Peer>()
} }
impl<B> Streams<B>
where B: Buf,
{
pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool)
-> Result<StreamRef<B>, ConnectionError> -> Result<StreamRef<B>, ConnectionError>
{ {