From e90a6e9250ca403cc3a367e79445eed41107e543 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 15 Jul 2017 22:50:13 +0000 Subject: [PATCH] wip: start splitting out stream management --- src/proto/connection.rs | 231 ++++++------------------------------ src/proto/flow_control.rs | 101 +++++++++++++++- src/proto/mod.rs | 30 +++-- src/proto/stream_tracker.rs | 102 ++++++++++++++-- 4 files changed, 247 insertions(+), 217 deletions(-) diff --git a/src/proto/connection.rs b/src/proto/connection.rs index b6d5a40..8f2f9ac 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,8 +1,7 @@ -use {Frame, FrameSize}; +use {ConnectionError, Frame, FrameSize}; use client::Client; -use error::{self, ConnectionError}; use frame::{self, StreamId}; -use proto::{self, Peer, ReadySink, StreamState, FlowController, WindowSize}; +use proto::{self, Peer, ReadySink, WindowSize}; use server::Server; use tokio_io::{AsyncRead, AsyncWrite}; @@ -12,152 +11,65 @@ use bytes::{Bytes, IntoBuf}; use futures::*; -use ordermap::OrderMap; -use fnv::FnvHasher; -use std::hash::BuildHasherDefault; use std::marker::PhantomData; /// An H2 connection #[derive(Debug)] pub struct Connection { - inner: proto::Transport, - streams: StreamMap, + inner: proto::Transport, peer: PhantomData

, - - /// Tracks the connection-level flow control window for receiving data from the - /// remote. - local_flow_controller: FlowController, - - /// Tracks the onnection-level flow control window for receiving data from the remote. - remote_flow_controller: FlowController, - - /// When `poll_window_update` is not ready, then the calling task is saved to be - /// notified later. Access to poll_window_update must not be shared across tasks. - blocked_window_update: Option, - - sending_window_update: Option, } -type StreamMap = OrderMap>; - -pub fn new(transport: proto::Transport) +pub fn new(transport: proto::Transport) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, B: IntoBuf, { - let recv_window_size = transport.local_settings().initial_window_size(); - let send_window_size = transport.remote_settings().initial_window_size(); Connection { inner: transport, - streams: StreamMap::default(), peer: PhantomData, - - local_flow_controller: FlowController::new(recv_window_size), - remote_flow_controller: FlowController::new(send_window_size), - - blocked_window_update: None, - sending_window_update: None, } } impl Connection { - #[inline] - fn claim_local_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { - self.local_flow_controller.claim_window(len) - .map_err(|_| error::Reason::FlowControlError.into()) - } - - #[inline] - fn claim_remote_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { - self.remote_flow_controller.claim_window(len) - .map_err(|_| error::User::FlowControlViolation.into()) - } - /// Polls for the amount of additional data that may be sent to a remote. /// /// Connection and stream updates are distinct. - pub fn poll_window_update(&mut self, id: StreamId) -> Poll { - let added = if id.is_zero() { - self.remote_flow_controller.take_window_update() - } else { - self.streams.get_mut(&id).and_then(|s| s.take_send_window_update()) - }; - - match added { - Some(incr) => Ok(Async::Ready(incr)), - None => { - self.blocked_window_update = Some(task::current()); - Ok(Async::NotReady) - } - } + pub fn poll_window_update(&mut self, _id: StreamId) -> Poll { + // let added = if id.is_zero() { + // self.remote_flow_controller.take_window_update() + // } else { + // self.streams.get_mut(&id).and_then(|s| s.take_send_window_update()) + // }; + // match added { + // Some(incr) => Ok(Async::Ready(incr)), + // None => { + // self.blocked_window_update = Some(task::current()); + // Ok(Async::NotReady) + // } + // } + unimplemented!() } - /// Increases the amount of data that the remote endpoint may send. /// /// Connection and stream updates are distinct. - pub fn increment_window_size(&mut self, id: StreamId, incr: WindowSize) { - assert!(self.sending_window_update.is_none()); - - let added = if id.is_zero() { - self.local_flow_controller.grow_window(incr); - self.local_flow_controller.take_window_update() - } else { - self.streams.get_mut(&id).and_then(|s| { - s.grow_recv_window(incr); - s.take_recv_window_update() - }) - }; - - if let Some(added) = added { - self.sending_window_update = Some(frame::WindowUpdate::new(id, added)); - } - } - - /// Handles a window update received from the remote, indicating that the local may - /// send `incr` additional bytes. - /// - /// Connection window updates (id=0) and stream window updates are advertised - /// distinctly. - fn increment_send_window_size(&mut self, id: StreamId, incr: WindowSize) { - if incr == 0 { - return; - } - - let added = if id.is_zero() { - self.remote_flow_controller.grow_window(incr); - true - } else if let Some(mut s) = self.streams.get_mut(&id) { - s.grow_send_window(incr); - true - } else { - false - }; - - if added { - if let Some(task) = self.blocked_window_update.take() { - task.notify(); - } - } - } -} - -impl Connection - where T: AsyncRead + AsyncWrite, - P: Peer, - B: IntoBuf -{ - /// Attempts to send a window update to the remote, if one is pending. - fn poll_sending_window_update(&mut self) -> Poll<(), ConnectionError> { - if let Some(f) = self.sending_window_update.take() { - if self.inner.start_send(f.into())?.is_not_ready() { - self.sending_window_update = Some(f); - return Ok(Async::NotReady); - } - } - - Ok(Async::Ready(())) + pub fn increment_window_size(&mut self, _id: StreamId, _incr: WindowSize) { + // assert!(self.sending_window_update.is_none()); + // let added = if id.is_zero() { + // self.local_flow_controller.grow_window(incr); + // self.local_flow_controller.take_window_update() + // } else { + // self.streams.get_mut(&id).and_then(|s| { + // s.grow_recv_window(incr); + // s.take_recv_window_update() + // }) + // }; + // if let Some(added) = added { + // self.sending_window_update = Some(frame::WindowUpdate::new(id, added)); + // } + unimplemented!() } } @@ -252,22 +164,6 @@ impl Stream for Connection let stream_id = v.stream_id(); let end_of_stream = v.is_end_stream(); - let init_window_size = self.inner.local_settings().initial_window_size(); - - let stream_initialized = try!(self.streams.entry(stream_id) - .or_insert(StreamState::default()) - .recv_headers::

(end_of_stream, init_window_size)); - - if stream_initialized { - // TODO: Ensure available capacity for a new stream - // This won't be as simple as self.streams.len() as closed - // connections should not be factored. - - if !P::is_valid_remote_stream_id(stream_id) { - unimplemented!(); - } - } - Frame::Headers { id: stream_id, headers: P::convert_poll_message(v), @@ -279,12 +175,6 @@ impl Stream for Connection let id = v.stream_id(); let end_of_stream = v.is_end_stream(); - self.claim_local_window(v.len())?; - match self.streams.get_mut(&id) { - None => return Err(error::Reason::ProtocolError.into()), - Some(state) => state.recv_data(end_of_stream, v.len())?, - } - Frame::Data { id, end_of_stream, @@ -293,16 +183,6 @@ impl Stream for Connection } } - Some(WindowUpdate(v)) => { - // When a window update is received from the remote, apply that update - // to the proper stream so that more data may be sent to the remote. - self.increment_send_window_size(v.stream_id(), v.size_increment()); - - // There's nothing to return yet, so continue attempting to read - // additional frames. - continue; - } - Some(frame) => panic!("unexpected frame; frame={:?}", frame), None => return Ok(Async::Ready(None)), }; @@ -332,33 +212,9 @@ impl Sink for Connection if self.poll_ready()? == Async::NotReady { return Ok(AsyncSink::NotReady(item)); } - assert!(self.sending_window_update.is_none()); match item { Frame::Headers { id, headers, end_of_stream } => { - let init_window_size = self.inner.remote_settings().initial_window_size(); - - // Transition the stream state, creating a new entry if needed - // - // TODO: Response can send multiple headers frames before body (1xx - // responses). - // - // ACTUALLY(ver), maybe not? - // https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 - let stream_initialized = try!(self.streams.entry(id) - .or_insert(StreamState::default()) - .send_headers::

(end_of_stream, init_window_size)); - - if stream_initialized { - // TODO: Ensure available capacity for a new stream - // This won't be as simple as self.streams.len() as closed - // connections should not be factored. - if !P::is_valid_local_stream_id(id) { - // TODO: clear state - return Err(error::User::InvalidStreamId.into()); - } - } - let frame = P::convert_send_message(id, headers, end_of_stream); // We already ensured that the upstream can handle the frame, so @@ -373,15 +229,7 @@ impl Sink for Connection Ok(AsyncSink::Ready) } - Frame::Data { id, data, data_len, end_of_stream } => { - try!(self.claim_remote_window(data_len)); - - // The stream must be initialized at this point. - match self.streams.get_mut(&id) { - None => return Err(error::User::InactiveStreamId.into()), - Some(mut s) => try!(s.send_data(end_of_stream, data_len)), - } - + Frame::Data { id, data, end_of_stream, .. } => { let mut frame = frame::Data::from_buf(id, data.into_buf()); if end_of_stream { frame.set_end_stream(); @@ -412,13 +260,7 @@ impl Sink for Connection fn poll_complete(&mut self) -> Poll<(), ConnectionError> { trace!("poll_complete"); - - try_ready!(self.inner.poll_complete()); - - // TODO check for settings updates and update the initial window size of all - // streams. - - self.poll_sending_window_update() + self.inner.poll_complete() } } @@ -429,7 +271,6 @@ impl ReadySink for Connection { fn poll_ready(&mut self) -> Poll<(), Self::SinkError> { trace!("poll_ready"); - try_ready!(self.inner.poll_ready()); - self.poll_sending_window_update() + self.inner.poll_ready() } } diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 012cb61..d07b075 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -1,6 +1,7 @@ use ConnectionError; +use error; use frame::{self, Frame}; -use proto::{ReadySink, StreamMap, ConnectionTransporter, StreamTransporter}; +use proto::*; use futures::*; @@ -9,6 +10,19 @@ pub struct FlowControl { inner: T, initial_local_window_size: u32, initial_remote_window_size: u32, + + /// Tracks the connection-level flow control window for receiving data from the + /// remote. + local_flow_controller: FlowController, + + /// Tracks the onnection-level flow control window for receiving data from the remote. + remote_flow_controller: FlowController, + + /// When `poll_window_update` is not ready, then the calling task is saved to be + /// notified later. Access to poll_window_update must not be shared across tasks. + blocked_window_update: Option, + + sending_window_update: Option, } impl FlowControl @@ -25,10 +39,71 @@ impl FlowControl inner, initial_local_window_size, initial_remote_window_size, + local_flow_controller: FlowController::new(initial_local_window_size), + remote_flow_controller: FlowController::new(initial_remote_window_size), + blocked_window_update: None, + sending_window_update: None, } } } +impl FlowControl { + #[inline] + fn claim_local_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { + self.local_flow_controller.claim_window(len) + .map_err(|_| error::Reason::FlowControlError.into()) + } + + #[inline] + fn claim_remote_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { + self.remote_flow_controller.claim_window(len) + .map_err(|_| error::User::FlowControlViolation.into()) + } +} + +impl FlowControl { + /// Handles a window update received from the remote, indicating that the local may + /// send `incr` additional bytes. + /// + /// Connection window updates (id=0) and stream window updates are advertised + /// distinctly. + fn grow_remote_window(&mut self, id: StreamId, incr: WindowSize) { + if incr == 0 { + return; + } + let added = if id.is_zero() { + self.remote_flow_controller.grow_window(incr); + true + } else if let Some(mut s) = self.streams_mut().get_mut(&id) { + s.grow_send_window(incr); + true + } else { + false + }; + if added { + if let Some(task) = self.blocked_window_update.take() { + task.notify(); + } + } + } +} + +impl FlowControl + where T: Sink, SinkError = ConnectionError>, +{ + /// Attempts to send a window update to the remote, if one is pending. + fn poll_sending_window_update(&mut self) -> Poll<(), ConnectionError> { + if let Some(f) = self.sending_window_update.take() { + if self.inner.start_send(f.into())?.is_not_ready() { + self.sending_window_update = Some(f); + return Ok(Async::NotReady); + } + } + + Ok(Async::Ready(())) + } +} + /// Applies an update to an endpoint's initial window size. /// /// Per RFC 7540 ยง6.9.2 @@ -116,7 +191,23 @@ impl Stream for FlowControl type Error = T::Error; fn poll(&mut self) -> Poll, T::Error> { - self.inner.poll() + use frame::Frame::*; + trace!("poll"); + + loop { + match try_ready!(self.inner.poll()) { + Some(WindowUpdate(v)) => { + self.grow_remote_window(v.stream_id(), v.size_increment()); + } + + Some(Data(v)) => { + self.claim_local_window(v.len())?; + return Ok(Async::Ready(Some(Data(v)))); + } + + v => return Ok(Async::Ready(v)), + } + } } } @@ -129,6 +220,12 @@ impl Sink for FlowControl type SinkError = T::SinkError; fn start_send(&mut self, item: Frame) -> StartSend { + use frame::Frame::*; + + if let &Data(ref v) = &item { + self.claim_remote_window(v.len())?; + } + self.inner.start_send(item) } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 6e2e06f..6d2ee4d 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -27,7 +27,7 @@ use tokio_io::codec::length_delimited; use bytes::{Buf, IntoBuf}; -use ordermap::OrderMap; +use ordermap::{Entry, OrderMap}; use fnv::FnvHasher; use std::hash::BuildHasherDefault; @@ -45,13 +45,14 @@ use std::hash::BuildHasherDefault; /// /// All transporters below Settings must apply relevant settings before passing a frame on /// to another level. For example, if the frame writer n -type Transport = +type Transport = Settings< FlowControl< StreamTracker< PingPong< Framer, - B>>>>; + B>, + P>>>; type Framer = FramedRead< @@ -65,6 +66,14 @@ pub struct StreamMap { } impl StreamMap { + fn get_mut(&mut self, id: &StreamId) -> Option<&mut StreamState> { + self.inner.get_mut(id) + } + + fn entry(&mut self, id: StreamId) -> Entry> { + self.inner.entry(id) + } + fn shrink_local_window(&mut self, decr: u32) { for (_, mut s) in &mut self.inner { s.shrink_recv_window(decr) @@ -159,10 +168,17 @@ pub fn from_server_handshaker(settings: Settings .num_skip(0) // Don't skip the header .new_read(io); - FlowControl::new(initial_local_window_size, initial_remote_window_size, - StreamTracker::new(local_max_concurrency, remote_max_concurrency, - PingPong::new( - FramedRead::new(framer)))) + FlowControl::new( + initial_local_window_size, + initial_remote_window_size, + StreamTracker::new( + initial_local_window_size, + initial_remote_window_size, + local_max_concurrency, + remote_max_concurrency, + PingPong::new(FramedRead::new(framer)) + ) + ) }); connection::new(transport) diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_tracker.rs index b962e6f..1cfe35a 100644 --- a/src/proto/stream_tracker.rs +++ b/src/proto/stream_tracker.rs @@ -1,36 +1,48 @@ use ConnectionError; +use error::User::*; use frame::{self, Frame}; -use proto::{ReadySink, StreamMap, ConnectionTransporter, StreamTransporter}; +use proto::*; use futures::*; +use std::marker::PhantomData; + #[derive(Debug)] -pub struct StreamTracker { +pub struct StreamTracker { inner: T, + peer: PhantomData

, streams: StreamMap, local_max_concurrency: Option, remote_max_concurrency: Option, + initial_local_window_size: WindowSize, + initial_remote_window_size: WindowSize, } -impl StreamTracker +impl StreamTracker where T: Stream, - T: Sink, SinkError = ConnectionError> + T: Sink, SinkError = ConnectionError>, + P: Peer { - pub fn new(local_max_concurrency: Option, + pub fn new(initial_local_window_size: WindowSize, + initial_remote_window_size: WindowSize, + local_max_concurrency: Option, remote_max_concurrency: Option, inner: T) - -> StreamTracker + -> StreamTracker { StreamTracker { inner, + peer: PhantomData, streams: StreamMap::default(), local_max_concurrency, remote_max_concurrency, + initial_local_window_size, + initial_remote_window_size, } } } -impl StreamTransporter for StreamTracker { +impl StreamTransporter for StreamTracker { fn streams(&self) -> &StreamMap { &self.streams } @@ -58,38 +70,101 @@ impl StreamTransporter for StreamTracker { /// > exceed the new value or allow streams to complete. /// /// This module does NOT close streams when the setting changes. -impl ConnectionTransporter for StreamTracker { +impl ConnectionTransporter for StreamTracker + where T: ConnectionTransporter +{ fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.local_max_concurrency = set.max_concurrent_streams(); + self.initial_local_window_size = set.initial_window_size(); self.inner.apply_local_settings(set) } fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.remote_max_concurrency = set.max_concurrent_streams(); + self.initial_remote_window_size = set.initial_window_size(); self.inner.apply_remote_settings(set) } } -impl Stream for StreamTracker - where T: Stream, Error = ConnectionError>, +impl Stream for StreamTracker + where T: Stream, + P: Peer, { type Item = T::Item; type Error = T::Error; fn poll(&mut self) -> Poll, T::Error> { - self.inner.poll() + use frame::Frame::*; + + match try_ready!(self.inner.poll()) { + Some(Headers(v)) => { + let id = v.stream_id(); + let eos = v.is_end_stream(); + + let initialized = self.streams + .entry(id) + .or_insert_with(|| StreamState::default()) + .recv_headers::

(eos, self.initial_local_window_size)?; + + if initialized { + // TODO: Ensure available capacity for a new stream + // This won't be as simple as self.streams.len() as closed + // connections should not be factored. + + if !P::is_valid_remote_stream_id(id) { + unimplemented!(); + } + } + + Ok(Async::Ready(Some(Headers(v)))) + } + + f => Ok(Async::Ready(f)) + + } } } -impl Sink for StreamTracker +impl Sink for StreamTracker where T: Sink, SinkError = ConnectionError>, + P: Peer, { type SinkItem = T::SinkItem; type SinkError = T::SinkError; fn start_send(&mut self, item: T::SinkItem) -> StartSend { + use frame::Frame::*; + + if let &Headers(ref v) = &item { + let id = v.stream_id(); + let eos = v.is_end_stream(); + + // Transition the stream state, creating a new entry if needed + // + // TODO: Response can send multiple headers frames before body (1xx + // responses). + // + // ACTUALLY(ver), maybe not? + // https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 + let initialized = self.streams + .entry(id) + .or_insert_with(|| StreamState::default()) + .send_headers::

(eos, self.initial_remote_window_size)?; + + if initialized { + // TODO: Ensure available capacity for a new stream + // This won't be as simple as self.streams.len() as closed + // connections should not be factored. + if !P::is_valid_local_stream_id(id) { + // TODO: clear state + return Err(InvalidStreamId.into()); + } + } + } + self.inner.start_send(item) + } fn poll_complete(&mut self) -> Poll<(), T::SinkError> { @@ -98,10 +173,11 @@ impl Sink for StreamTracker } -impl ReadySink for StreamTracker +impl ReadySink for StreamTracker where T: Stream, T: Sink, SinkError = ConnectionError>, T: ReadySink, + P: Peer, { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { self.inner.poll_ready()