From cbd3e172835b12f457e48b2cd22c312614be71eb Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 13 Jul 2017 02:24:36 +0000 Subject: [PATCH] ok, starting to look good --- src/frame/data.rs | 18 +++--- src/frame/settings.rs | 3 +- src/lib.rs | 4 +- src/proto/connection.rs | 130 ++++++++++++++++++++++++-------------- src/proto/flow_control.rs | 24 ++++--- src/proto/framed_read.rs | 58 ++++++++--------- src/proto/framed_write.rs | 8 +-- src/proto/mod.rs | 3 + src/proto/state.rs | 50 ++++++++------- 9 files changed, 168 insertions(+), 130 deletions(-) diff --git a/src/frame/data.rs b/src/frame/data.rs index 23d5dcf..7bde76e 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -1,10 +1,11 @@ +use FrameSize; use frame::{util, Frame, Head, Error, StreamId, Kind}; use bytes::{BufMut, Bytes, Buf}; #[derive(Debug)] pub struct Data { stream_id: StreamId, - data_len: usize, + data_len: FrameSize, data: T, flags: DataFlag, pad_len: Option, @@ -29,7 +30,7 @@ impl Data { }; Ok(Data { stream_id: head.stream_id(), - data_len: payload.len(), + data_len: payload.len() as FrameSize, data: payload, flags: flags, pad_len: pad_len, @@ -54,7 +55,7 @@ impl Data { Head::new(Kind::Data, self.flags.into(), self.stream_id) } - pub fn len(&self) -> usize { + pub fn len(&self) -> FrameSize { self.data_len } @@ -66,20 +67,21 @@ impl Data { impl Data { pub fn from_buf(stream_id: StreamId, data: T) -> Self { Data { - stream_id: stream_id, - data_len: data.remaining(), - data: data, + stream_id, + data_len: data.remaining() as FrameSize, + data, flags: DataFlag::default(), pad_len: None, } } pub fn encode_chunk(&mut self, dst: &mut U) { - if self.len() > dst.remaining_mut() { + let len = self.len() as usize; + if len > dst.remaining_mut() { unimplemented!(); } - self.head().encode(self.len(), dst); + self.head().encode(len, dst); dst.put(&mut self.data); } } diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 52de210..70c5d9c 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -1,3 +1,4 @@ +use FrameSize; use frame::{Frame, Error, Head, Kind, StreamId}; use bytes::{BytesMut, BufMut, BigEndian}; @@ -45,7 +46,7 @@ const ACK: u8 = 0x1; const ALL: u8 = ACK; pub const DEFAULT_SETTINGS_HEADER_TABLE_SIZE: usize = 4_096; -pub const DEFAULT_MAX_FRAME_SIZE: usize = 16_384; +pub const DEFAULT_MAX_FRAME_SIZE: FrameSize = 16_384; // ===== impl Settings ===== diff --git a/src/lib.rs b/src/lib.rs index 615f1c5..841a497 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,8 @@ pub use proto::Connection; use bytes::Bytes; +pub type FrameSize = u32; + /// An H2 connection frame #[derive(Debug)] pub enum Frame { @@ -52,7 +54,7 @@ pub enum Frame { id: StreamId, data: B, /// TODO figure out how to make this a requirement on `B` - data_len: usize, + data_len: FrameSize, end_of_stream: bool, }, Trailers { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index d5eb996..f01d0ca 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,8 +1,8 @@ -use Frame; +use {Frame, FrameSize}; use client::Client; use error::{self, ConnectionError}; use frame::{self, StreamId}; -use proto::{self, Peer, ReadySink, State, FlowController}; +use proto::{self, Peer, ReadySink, State, FlowController, WindowSize}; use server::Server; use tokio_io::{AsyncRead, AsyncWrite}; @@ -26,12 +26,12 @@ pub struct Connection { peer: PhantomData

, /// Tracks connection-level flow control. - local_flow_controller: FlowController, - remote_flow_controller: FlowController, + recv_flow_controller: FlowController, + send_flow_controller: FlowController, - pending_local_window_update: Option, - blocked_remote_window_update: Option, + pending_send_window_update: Option, + blocked_recv_window_update: Option, } type StreamMap = OrderMap>; @@ -42,37 +42,47 @@ pub fn new(transport: proto::Inner) P: Peer, B: IntoBuf, { - let local_window_size = transport.local_settings().initial_window_size(); - let remote_window_size = transport.remote_settings().initial_window_size(); + let recv_window_size = transport.local_settings().initial_window_size(); + let send_window_size = transport.remote_settings().initial_window_size(); Connection { inner: transport, streams: StreamMap::default(), peer: PhantomData, - local_flow_controller: FlowController::new(local_window_size), - remote_flow_controller: FlowController::new(remote_window_size), + recv_flow_controller: FlowController::new(recv_window_size), + send_flow_controller: FlowController::new(send_window_size), - pending_local_window_update: None, - blocked_remote_window_update: None, + pending_send_window_update: None, + blocked_recv_window_update: None, } } impl Connection { - pub fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { - if id.is_zero() { - return match self.local_flow_controller.take_window_update() { - Some(incr) => Ok(Async::Ready(incr)), - None => { - self.blocked_remote_window_update = Some(task::current()); - Ok(Async::NotReady) - } - }; - } + #[inline] + fn claim_connection_recv_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { + self.recv_flow_controller.claim_window(len) + .map_err(|_| error::Reason::FlowControlError.into()) + } - match self.streams.get_mut(&id).and_then(|mut s| s.take_remote_window_update()) { + #[inline] + fn claim_connection_send_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { + self.send_flow_controller.claim_window(len) + .map_err(|_| error::Reason::FlowControlError.into()) + } + + // TODO check max frame size + + pub fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { + let added = if id.is_zero() { + self.send_flow_controller.take_window_update() + } else { + self.streams.get_mut(&id).and_then(|mut s| s.take_recv_window_update()) + }; + + match added { Some(incr) => Ok(Async::Ready(incr)), None => { - self.blocked_remote_window_update = Some(task::current()); + self.blocked_recv_window_update = Some(task::current()); Ok(Async::NotReady) } } @@ -83,18 +93,18 @@ impl Connection { /// /// Connection window updates (StreamId=0) and stream window must be published /// distinctly. - pub fn init_send_window_update(&mut self, id: StreamId, incr: u32) { - assert!(self.pending_local_window_update.is_none()); + pub fn init_send_window_update(&mut self, id: StreamId, incr: WindowSize) { + assert!(self.pending_send_window_update.is_none()); let added = if id.is_zero() { - self.remote_flow_controller.add_to_window(incr); - self.remote_flow_controller.take_window_update() + self.send_flow_controller.add_to_window(incr); + self.send_flow_controller.take_window_update() } else { self.streams.get_mut(&id).and_then(|mut s| s.send_window_update(incr)) }; if let Some(added) = added { - self.pending_local_window_update = Some(frame::WindowUpdate::new(id, added)); + self.pending_send_window_update = Some(frame::WindowUpdate::new(id, added)); } } @@ -102,9 +112,9 @@ impl Connection { /// /// Connection window updates (id=0) and stream window updates are advertised /// distinctly. - fn recv_window_update(&mut self, id: StreamId, incr: u32) { + fn recv_window_update(&mut self, id: StreamId, incr: WindowSize) { if id.is_zero() { - return self.remote_flow_controller.add_to_window(incr); + return self.recv_flow_controller.add_to_window(incr); } if let Some(mut s) = self.streams.get_mut(&id) { @@ -116,28 +126,34 @@ impl Connection { impl Connection where T: AsyncRead + AsyncWrite, P: Peer, - B: IntoBuf, + B: IntoBuf { + /// Attempts to send a window update to the remote. fn poll_send_window_update(&mut self) -> Poll<(), ConnectionError> { - if let Some(f) = self.pending_local_window_update.take() { + if let Some(f) = self.pending_send_window_update.take() { if self.inner.start_send(f.into())?.is_not_ready() { - self.pending_local_window_update = Some(f); + self.pending_send_window_update = Some(f); return Ok(Async::NotReady); } } Ok(Async::Ready(())) } +} +// Note: this is bytes-specific for now so that we can know the payload's length. +impl Connection + where T: AsyncRead + AsyncWrite, + P: Peer, +{ pub fn send_data(self, id: StreamId, - data: B, - data_len: usize, + data: Bytes, end_of_stream: bool) -> sink::Send { self.send(Frame::Data { id, - data_len, + data_len: data.len() as FrameSize, data, end_of_stream, }) @@ -196,10 +212,15 @@ impl Stream for Connection let frame = match try!(self.inner.poll()) { Async::Ready(f) => f, Async::NotReady => { - // Because receiving new frames may depend on ensuring that the - // write buffer is clear, `poll_complete` is called here. - let _ = try!(self.poll_complete()); - return Ok(Async::NotReady); + // Receiving new frames may depend on ensuring that the write buffer + // is clear (e.g. if window updates need to be sent), so `poll_ready` + // is called here. + try_ready!(self.poll_ready()); + + // If the snder sink is ready, we attempt to poll the underlying + // stream once more because it, may have been made ready by flushing + // the sink. + try_ready!(self.inner.poll()) } }; @@ -234,22 +255,30 @@ impl Stream for Connection } Some(Data(v)) => { - let stream_id = v.stream_id(); + let id = v.stream_id(); let end_of_stream = v.is_end_stream(); - match self.streams.get_mut(&stream_id) { + + self.claim_connection_recv_window(v.len())?; + match self.streams.get_mut(&id) { None => return Err(error::Reason::ProtocolError.into()), - Some(state) => try!(state.recv_data(end_of_stream, v.len())), + Some(state) => state.recv_data(end_of_stream, v.len())?, } + Frame::Data { - id: stream_id, + id, + end_of_stream, data_len: v.len(), data: v.into_payload(), - end_of_stream, } } Some(WindowUpdate(v)) => { + // When a window update is read from the remote, apply that update to + // the proper stream. self.recv_window_update(v.stream_id(), v.size_increment()); + + // There's nothing to return yet, so continue attempting to read + // additional frames. continue; } @@ -278,11 +307,11 @@ impl Sink for Connection // First ensure that the upstream can process a new item. This ensures, for // instance, that any pending local window updates have been sent to the remote - // before sending any other frames. + // before sending any other (i.e. DATA) frames. if try!(self.poll_ready()).is_not_ready() { return Ok(AsyncSink::NotReady(item)); } - assert!(self.pending_local_window_update.is_none()); + assert!(self.pending_send_window_update.is_none()); match item { Frame::Headers { id, headers, end_of_stream } => { @@ -324,6 +353,8 @@ impl Sink for Connection } Frame::Data { id, data, data_len, end_of_stream } => { + self.claim_connection_send_window(data_len)?; + // The stream must be initialized at this point match self.streams.get_mut(&id) { None => return Err(error::User::InactiveStreamId.into()), @@ -331,14 +362,15 @@ impl Sink for Connection } let mut frame = frame::Data::from_buf(id, data.into_buf()); - if end_of_stream { frame.set_end_stream(); } let res = try!(self.inner.start_send(frame.into())); + // poll_ready has already been called. assert!(res.is_ready()); + Ok(AsyncSink::Ready) } diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index bc7dc13..2ead575 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -1,17 +1,19 @@ +use proto::WindowSize; + #[derive(Clone, Copy, Debug)] pub struct WindowUnderflow; -pub const DEFAULT_INITIAL_WINDOW_SIZE: u32 = 65_535; +pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; #[derive(Copy, Clone, Debug)] pub struct FlowController { /// Amount that may be claimed. - window_size: u32, + window_size: WindowSize, /// Amount to be removed by future increments. - underflow: u32, + underflow: WindowSize, /// The amount that has been incremented but not yet advertised (to the application or /// the remote). - next_window_update: u32, + next_window_update: WindowSize, } impl Default for FlowController { @@ -21,7 +23,7 @@ impl Default for FlowController { } impl FlowController { - pub fn new(window_size: u32) -> FlowController { + pub fn new(window_size: WindowSize) -> FlowController { FlowController { window_size, underflow: 0, @@ -29,19 +31,15 @@ impl FlowController { } } - pub fn window_size(&self) -> u32 { - self.window_size - } - /// Reduce future capacity of the window. /// /// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE. - pub fn shrink_window(&mut self, decr: u32) { + pub fn shrink_window(&mut self, decr: WindowSize) { self.underflow += decr; } /// Claim the provided amount from the window, if there is enough space. - pub fn claim_window(&mut self, sz: u32) -> Result<(), WindowUnderflow> { + pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> { if self.window_size < sz { return Err(WindowUnderflow); } @@ -51,7 +49,7 @@ impl FlowController { } /// Applies a window increment immediately. - pub fn add_to_window(&mut self, sz: u32) { + pub fn add_to_window(&mut self, sz: WindowSize) { if sz <= self.underflow { self.underflow -= sz; return; @@ -64,7 +62,7 @@ impl FlowController { } /// Obtains and clears an unadvertised window update. - pub fn take_window_update(&mut self) -> Option { + pub fn take_window_update(&mut self) -> Option { if self.next_window_update == 0 { return None; } diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 8b630a4..a94a2be 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -46,12 +46,24 @@ impl FramedRead { unimplemented!(); } - let frame = match head.kind() { + let kind = head.kind(); + debug!("received {:?}", kind); + + let frame = match kind { + Kind::Settings => { + frame::Settings::load(head, &bytes[frame::HEADER_LEN..])?.into() + } + Kind::Ping => { + frame::Ping::load(head, &bytes[frame::HEADER_LEN..])?.into() + } + Kind::WindowUpdate => { + frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..])?.into() + } Kind::Data => { let _ = bytes.split_to(frame::HEADER_LEN); - let frame = try!(frame::Data::load(head, bytes)); - frame.into() + frame::Data::load(head, bytes)?.into() } + Kind::Headers => { let mut buf = Cursor::new(bytes); buf.set_position(frame::HEADER_LEN as u64); @@ -67,41 +79,25 @@ impl FramedRead { frame.into() } - Kind::Priority => unimplemented!(), + + // TODO + Kind::Reset => { - let frame = try!(frame::Reset::load(head, &bytes[frame::HEADER_LEN..])); - debug!("decoded; frame={:?}", frame); - // TODO: implement - return Ok(None); - } - Kind::Settings => { - let frame = try!(frame::Settings::load(head, &bytes[frame::HEADER_LEN..])); - frame.into() - } - Kind::PushPromise => { - debug!("received PUSH_PROMISE"); - // TODO: implement - return Ok(None); - } - Kind::Ping => { - try!(frame::Ping::load(head, &bytes[frame::HEADER_LEN..])).into() + let _todo = try!(frame::Reset::load(head, &bytes[frame::HEADER_LEN..])); + unimplemented!(); } Kind::GoAway => { - let frame = try!(frame::GoAway::load(&bytes[frame::HEADER_LEN..])); - debug!("decoded; frame={:?}", frame); + let _todo = try!(frame::GoAway::load(&bytes[frame::HEADER_LEN..])); unimplemented!(); } - Kind::WindowUpdate => { - // TODO: IMPLEMENT - let frame = try!(frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..])); - debug!("decoded; frame={:?}", frame); - return Ok(None); - }, - Kind::Continuation => { - unimplemented!(); + Kind::PushPromise | + Kind::Priority | + Kind::Continuation | + Kind::Unknown => { + unimplemented!() } - Kind::Unknown => return Ok(None), }; + debug!("decoded; frame={:?}", frame); Ok(Some(frame)) } diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index 2bcfe71..307f660 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -1,4 +1,4 @@ -use {hpack, ConnectionError}; +use {hpack, ConnectionError, FrameSize}; use frame::{self, Frame}; use proto::ReadySink; @@ -24,7 +24,7 @@ pub struct FramedWrite { next: Option>, /// Max frame size, this is specified by the peer - max_frame_size: usize, + max_frame_size: FrameSize, } #[derive(Debug)] @@ -74,7 +74,7 @@ impl FramedWrite } fn frame_len(&self, data: &frame::Data) -> usize { - cmp::min(self.max_frame_size, data.len()) + cmp::min(self.max_frame_size, data.len()) as usize } } @@ -94,7 +94,7 @@ impl Sink for FramedWrite match item { Frame::Data(mut v) => { - if v.len() >= CHAIN_THRESHOLD { + if v.len() >= (CHAIN_THRESHOLD as FrameSize) { let head = v.head(); let len = self.frame_len(&v); diff --git a/src/proto/mod.rs b/src/proto/mod.rs index ecc1c30..5fb5c12 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -35,6 +35,9 @@ type Framed = FramedRead< FramedWrite>; + +pub type WindowSize = u32; + /// Create a full H2 transport from an I/O handle. /// /// This is called as the final step of the client handshake future. diff --git a/src/proto/state.rs b/src/proto/state.rs index c4a8a72..be8967c 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -1,4 +1,4 @@ -use Peer; +use {FrameSize, Peer}; use error::ConnectionError; use error::Reason::*; use error::User::*; @@ -102,7 +102,7 @@ impl State { } } - pub fn take_remote_window_update(&mut self) -> Option { + pub fn take_recv_window_update(&mut self) -> Option { use self::State::*; use self::PeerState::*; @@ -129,7 +129,7 @@ impl State { /// > flow-control window and MUST NOT send new flow-controlled frames until it /// > receives WINDOW_UPDATE frames that cause the flow-control window to become /// > positive. - pub fn update_remote_initial_window_size(&mut self, old: u32, new: u32) { + pub fn update_initial_recv_window_size(&mut self, old: u32, new: u32) { use self::State::*; use self::PeerState::*; @@ -147,7 +147,7 @@ impl State { } /// TODO Connection doesn't have an API for local updates yet. - pub fn update_local_initial_window_size(&mut self, _old: u32, _new: u32) { + pub fn update_initial_send_window_size(&mut self, _old: u32, _new: u32) { //use self::State::*; //use self::PeerState::*; unimplemented!() @@ -159,7 +159,7 @@ impl State { /// stream id. `Err` is returned if this is an invalid state transition. pub fn recv_headers(&mut self, eos: bool, - remote_window_size: u32) + initial_recv_window_size: u32) -> Result { use self::State::*; @@ -171,7 +171,7 @@ impl State { if eos { *self = HalfClosedRemote(local); } else { - *self = Open { local, remote: Data(FlowController::new(remote_window_size)) }; + *self = Open { local, remote: Data(FlowController::new(initial_recv_window_size)) }; } Ok(true) } @@ -191,7 +191,7 @@ impl State { if eos { *self = Closed; } else { - *self = HalfClosedLocal(Data(FlowController::new(remote_window_size))); + *self = HalfClosedLocal(Data(FlowController::new(initial_recv_window_size))); }; Ok(false) } @@ -204,22 +204,22 @@ impl State { } } - pub fn recv_data(&mut self, eos: bool, len: usize) -> Result<(), ConnectionError> { + pub fn recv_data(&mut self, eos: bool, len: FrameSize) -> Result<(), ConnectionError> { use self::State::*; match *self { - Open { local, remote } => { + Open { local, mut remote } => { try!(remote.check_is_data(ProtocolError.into())); - try!(remote.check_window_size(len, FlowControlError.into())); + try!(remote.claim_window_size(len, FlowControlError.into())); if eos { *self = HalfClosedRemote(local); } Ok(()) } - HalfClosedLocal(remote) => { + HalfClosedLocal(mut remote) => { try!(remote.check_is_data(ProtocolError.into())); - try!(remote.check_window_size(len, FlowControlError.into())); + try!(remote.claim_window_size(len, FlowControlError.into())); if eos { *self = Closed; } @@ -238,7 +238,11 @@ impl State { /// /// Returns true if this state transition results in initializing the stream /// id. `Err` is returned if this is an invalid state transition. - pub fn send_headers(&mut self, eos: bool, local_window_size: u32) -> Result { + pub fn send_headers(&mut self, + eos: bool, + initial_send_window_size: u32) + -> Result + { use self::State::*; use self::PeerState::*; @@ -248,7 +252,7 @@ impl State { HalfClosedLocal(Headers) } else { Open { - local: Data(FlowController::new(local_window_size)), + local: Data(FlowController::new(initial_send_window_size)), remote: Headers, } }; @@ -262,7 +266,7 @@ impl State { *self = if eos { HalfClosedLocal(remote) } else { - let local = Data(FlowController::new(local_window_size)); + let local = Data(FlowController::new(initial_send_window_size)); Open { local, remote } }; @@ -275,7 +279,7 @@ impl State { *self = if eos { Closed } else { - HalfClosedRemote(Data(FlowController::new(local_window_size))) + HalfClosedRemote(Data(FlowController::new(initial_send_window_size))) }; Ok(false) @@ -289,22 +293,22 @@ impl State { } } - pub fn send_data(&mut self, eos: bool, len: usize) -> Result<(), ConnectionError> { + pub fn send_data(&mut self, eos: bool, len: FrameSize) -> Result<(), ConnectionError> { use self::State::*; match *self { - Open { local, remote } => { + Open { mut local, remote } => { try!(local.check_is_data(UnexpectedFrameType.into())); - try!(local.check_window_size(len, FlowControlViolation.into())); + try!(local.claim_window_size(len, FlowControlViolation.into())); if eos { *self = HalfClosedLocal(remote); } Ok(()) } - HalfClosedRemote(local) => { + HalfClosedRemote(mut local) => { try!(local.check_is_data(UnexpectedFrameType.into())); - try!(local.check_window_size(len, FlowControlViolation.into())); + try!(local.claim_window_size(len, FlowControlViolation.into())); if eos { *self = Closed; } @@ -353,10 +357,10 @@ impl PeerState { } #[inline] - fn check_window_size(&self, len: usize, err: ConnectionError) -> Result<(), ConnectionError> { + fn claim_window_size(&mut self, sz: FrameSize, err: ConnectionError) -> Result<(), ConnectionError> { use self::PeerState::*; match self { - &Data(ref fc) if len <= fc.window_size() as usize=> Ok(()), + &mut Data(ref mut fc) => fc.claim_window(sz).map_err(|_| err), _ => Err(err), } }