From 8a15663ed234b94dbb49385fe0e1e7e5c821fd6c Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 11 Aug 2017 16:57:51 -0700 Subject: [PATCH] Progress towards allowing large writes --- src/error.rs | 4 + src/frame/data.rs | 15 ++++ src/frame/mod.rs | 18 ++++ src/proto/codec.rs | 9 ++ src/proto/connection.rs | 51 +++-------- src/proto/framed_read.rs | 4 + src/proto/framed_write.rs | 77 +++++++++++------ src/proto/mod.rs | 9 +- src/proto/settings.rs | 9 +- src/proto/streams/mod.rs | 3 +- src/proto/streams/prioritize.rs | 147 ++++++++++++++++++++++++++------ src/proto/streams/recv.rs | 2 +- src/proto/streams/send.rs | 4 +- src/proto/streams/streams.rs | 10 +-- 14 files changed, 254 insertions(+), 108 deletions(-) diff --git a/src/error.rs b/src/error.rs index b5294b3..815402e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -78,6 +78,9 @@ pub enum User { /// transmit a Data frame to the remote. FlowControlViolation, + /// The payload size is too big + PayloadTooBig, + /// The connection state is corrupt and the connection should be dropped. Corrupt, @@ -128,6 +131,7 @@ macro_rules! user_desc { StreamReset(_) => concat!($prefix, "frame sent on reset stream"), Corrupt => concat!($prefix, "connection state corrupt"), Rejected => concat!($prefix, "stream would exceed remote max concurrency"), + PayloadTooBig => concat!($prefix, "payload too big"), } }); } diff --git a/src/frame/data.rs b/src/frame/data.rs index ab02bad..7c06e41 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -52,9 +52,24 @@ impl Data { &self.data } + pub fn payload_mut(&mut self) -> &mut T { + &mut self.data + } + pub fn into_payload(self) -> T { self.data } + + pub fn map(self, f: F) -> Data + where F: FnOnce(T) -> U, + { + Data { + stream_id: self.stream_id, + data: f(self.data), + flags: self.flags, + pad_len: self.pad_len, + } + } } impl Data { diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 9b2ab7e..45594b4 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -80,6 +80,24 @@ impl Frame { _ => false, } } + + pub fn map(self, f: F) -> Frame + where F: FnOnce(T) -> U + { + use self::Frame::*; + + match self { + Data(frame) => frame.map(f).into(), + Headers(frame) => frame.into(), + Priority(frame) => frame.into(), + PushPromise(frame) => frame.into(), + Settings(frame) => frame.into(), + Ping(frame) => frame.into(), + GoAway(frame) => frame.into(), + WindowUpdate(frame) => frame.into(), + Reset(frame) => frame.into(), + } + } } impl Frame { diff --git a/src/proto/codec.rs b/src/proto/codec.rs index 460dec4..ae20194 100644 --- a/src/proto/codec.rs +++ b/src/proto/codec.rs @@ -12,6 +12,15 @@ impl Codec { self.framed_write().apply_remote_settings(frame); } + /// Takes the data payload value that was fully written to the socket + pub(crate) fn take_last_data_frame(&mut self) -> Option> { + self.framed_write().take_last_data_frame() + } + + pub fn max_send_frame_size(&self) -> usize { + self.inner.get_ref().max_frame_size() + } + fn framed_read(&mut self) -> &mut FramedRead> { &mut self.inner } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 217ded2..b06eb2c 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -12,15 +12,12 @@ use std::marker::PhantomData; /// An H2 connection #[derive(Debug)] -pub struct Connection { +pub(crate) struct Connection { // Codec - codec: Codec, - - // TODO: Remove - ping_pong: PingPong, + codec: Codec>, + ping_pong: PingPong>, settings: Settings, streams: Streams, - _phantom: PhantomData

, } @@ -29,7 +26,7 @@ impl Connection P: Peer, B: IntoBuf, { - pub fn new(codec: Codec) -> Connection { + pub fn new(codec: Codec>) -> Connection { // TODO: Actually configure let streams = Streams::new::

(streams::Config { max_remote_initiated: None, @@ -49,10 +46,11 @@ impl Connection /// Returns `Ready` when the connection is ready to receive a frame. pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.poll_send_ready()); - - // TODO: Once there is write buffering, this shouldn't be needed - try_ready!(self.codec.poll_ready()); + // The order of these calls don't really matter too much as only one + // should have pending work. + try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); + try_ready!(self.settings.send_pending_ack(&mut self.codec, &mut self.streams)); + try_ready!(self.streams.send_pending_refusal(&mut self.codec)); Ok(().into()) } @@ -74,7 +72,7 @@ impl Connection loop { // First, ensure that the `Connection` is able to receive a frame - try_ready!(self.poll_recv_ready()); + try_ready!(self.poll_ready()); trace!("polling codec"); @@ -137,38 +135,11 @@ impl Connection } } - // ===== Private ===== - - /// Returns `Ready` when the `Connection` is ready to receive a frame from - /// the socket. - fn poll_recv_ready(&mut self) -> Poll<(), ConnectionError> { - // The order of these calls don't really matter too much as only one - // should have pending work. - try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); - try_ready!(self.settings.send_pending_ack(&mut self.codec, &mut self.streams)); - try_ready!(self.streams.send_pending_refusal(&mut self.codec)); - - Ok(().into()) - } - - /// Returns `Ready` when the `Connection` is ready to accept a frame from - /// the user - /// - /// This function is currently used by poll_complete, but at some point it - /// will probably not be required. - fn poll_send_ready(&mut self) -> Poll<(), ConnectionError> { - // TODO: Is this function needed? - try_ready!(self.poll_recv_ready()); - - Ok(().into()) - } - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.poll_send_ready()); + try_ready!(self.poll_ready()); // Ensure all window updates have been sent. try_ready!(self.streams.poll_complete(&mut self.codec)); - try_ready!(self.codec.poll_complete()); Ok(().into()) } diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index d6774d6..c5d1e40 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -147,6 +147,10 @@ impl FramedRead { Ok(Some(frame)) } + pub fn get_ref(&self) -> &T { + self.inner.get_ref() + } + pub fn get_mut(&mut self) -> &mut T { self.inner.get_mut() } diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index c95577c..6869b0f 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -1,4 +1,5 @@ use {hpack, ConnectionError, FrameSize}; +use error::User::*; use frame::{self, Frame}; use futures::*; @@ -17,24 +18,23 @@ pub struct FramedWrite { hpack: hpack::Encoder, /// Write buffer + /// + /// TODO: Should this be a ring buffer? buf: Cursor, /// Next frame to encode next: Option>, + /// Last data frame + last_data_frame: Option>, + /// Max frame size, this is specified by the peer max_frame_size: FrameSize, } #[derive(Debug)] enum Next { - Data { - /// Length of the current frame being written - frame_len: usize, - - /// Data frame to encode - data: frame::Data, - }, + Data(frame::Data), Continuation(frame::Continuation), } @@ -60,6 +60,7 @@ impl FramedWrite hpack: hpack::Encoder::default(), buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)), next: None, + last_data_frame: None, max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, } } @@ -82,20 +83,27 @@ impl FramedWrite } fn is_empty(&self) -> bool { - self.next.is_none() && !self.buf.has_remaining() - } - - fn frame_len(&self, data: &frame::Data) -> usize { - cmp::min(self.max_frame_size as usize, data.payload().remaining()) + match self.next { + Some(Next::Data(ref frame)) => !frame.payload().has_remaining(), + _ => !self.buf.has_remaining(), + } } } impl FramedWrite { + pub fn max_frame_size(&self) -> usize { + self.max_frame_size as usize + } + pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { if let Some(val) = settings.max_frame_size() { self.max_frame_size = val; } } + + pub fn take_last_data_frame(&mut self) -> Option> { + self.last_data_frame.take() + } } impl Sink for FramedWrite @@ -116,24 +124,30 @@ impl Sink for FramedWrite match item { Frame::Data(mut v) => { - if v.payload().remaining() >= CHAIN_THRESHOLD { + // Ensure that the payload is not greater than the max frame. + let len = v.payload().remaining(); + + if len > self.max_frame_size() { + return Err(PayloadTooBig.into()); + } + + if len >= CHAIN_THRESHOLD { let head = v.head(); - let len = self.frame_len(&v); // Encode the frame head to the buffer head.encode(len, self.buf.get_mut()); // Save the data frame - self.next = Some(Next::Data { - frame_len: len, - data: v, - }); + self.next = Some(Next::Data(v)); } else { v.encode_chunk(self.buf.get_mut()); // The chunk has been fully encoded, so there is no need to // keep it around assert_eq!(v.payload().remaining(), 0, "chunk not fully encoded"); + + // Save off the last frame... + self.last_data_frame = Some(v); } } Frame::Headers(v) => { @@ -179,16 +193,27 @@ impl Sink for FramedWrite fn poll_complete(&mut self) -> Poll<(), ConnectionError> { trace!("poll_complete"); - // TODO: implement - match self.next { - Some(Next::Data { .. }) => unimplemented!(), - _ => {} + while !self.is_empty() { + match self.next { + Some(Next::Data(ref mut frame)) => { + let mut buf = self.buf.by_ref().chain(frame.payload_mut()); + try_ready!(self.inner.write_buf(&mut buf)); + } + _ => { + try_ready!(self.inner.write_buf(&mut self.buf)); + } + } } - // As long as there is data to write, try to write it! - while !self.is_empty() { - trace!("writing {}", self.buf.remaining()); - try_ready!(self.inner.write_buf(&mut self.buf)); + // The data frame has been written, so unset it + match self.next.take() { + Some(Next::Data(frame)) => { + self.last_data_frame = Some(frame); + } + Some(Next::Continuation(frame)) => { + unimplemented!(); + } + None => {} } trace!("flushing buffer"); diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 9be1465..9b39877 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -6,14 +6,15 @@ mod ping_pong; mod settings; mod streams; -pub use self::connection::Connection; -pub use self::streams::{Streams, StreamRef, Chunk}; +pub(crate) use self::connection::Connection; +pub(crate) use self::streams::{Streams, StreamRef, Chunk}; use self::codec::Codec; use self::framed_read::FramedRead; use self::framed_write::FramedWrite; use self::ping_pong::PingPong; use self::settings::Settings; +use self::streams::Prioritized; use {StreamId, ConnectionError}; use error::Reason; @@ -63,7 +64,7 @@ pub const MAX_WINDOW_SIZE: WindowSize = ::std::u32::MAX; /// When the server is performing the handshake, it is able to only send /// `Settings` frames and is expected to receive the client preface as a byte /// stream. To represent this, `Settings>` is returned. -pub fn framed_write(io: T) -> FramedWrite +pub(crate) fn framed_write(io: T) -> FramedWrite where T: AsyncRead + AsyncWrite, B: Buf, { @@ -71,7 +72,7 @@ pub fn framed_write(io: T) -> FramedWrite } /// Create a full H2 transport from the server handshaker -pub fn from_framed_write(framed_write: FramedWrite) +pub(crate) fn from_framed_write(framed_write: FramedWrite>) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 5952b31..31ee017 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -2,7 +2,7 @@ use {frame, ConnectionError}; use proto::*; #[derive(Debug)] -pub struct Settings { +pub(crate) struct Settings { /// Received SETTINGS frame pending processing. The ACK must be written to /// the socket first then the settings applied **before** receiving any /// further frames. @@ -26,12 +26,13 @@ impl Settings { } } - pub fn send_pending_ack(&mut self, - dst: &mut Codec, - streams: &mut Streams) + pub fn send_pending_ack(&mut self, + dst: &mut Codec, + streams: &mut Streams) -> Poll<(), ConnectionError> where T: AsyncWrite, B: Buf, + C: Buf, { if let Some(ref settings) = self.pending { let frame = frame::Settings::ack(); diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 1dfe92b..d53fdc9 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -8,7 +8,8 @@ mod store; mod stream; mod streams; -pub use self::streams::{Streams, StreamRef, Chunk}; +pub(crate) use self::streams::{Streams, StreamRef, Chunk}; +pub(crate) use self::prioritize::Prioritized; use self::buffer::Buffer; use self::flow_control::FlowControl; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 6777eff..b9644d9 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -22,6 +22,17 @@ pub(super) struct Prioritize { conn_task: Option, } +#[derive(Debug)] +pub(crate) struct Prioritized { + // The buffer + inner: B, + + // The stream that this is associated with + stream: store::Key, +} + +// ===== impl Prioritize ===== + impl Prioritize where B: Buf, { @@ -61,40 +72,56 @@ impl Prioritize pub fn queue_frame(&mut self, frame: Frame, stream: &mut store::Ptr) + { + if self.queue_frame2(frame, stream) { + // Notification required + if let Some(ref task) = self.conn_task { + task.notify(); + } + } + } + + /// Queue frame without actually notifying. Returns ture if the queue was + /// succesfful. + fn queue_frame2(&mut self, frame: Frame, stream: &mut store::Ptr) + -> bool { self.buffered_data += frame.flow_len(); // queue the frame in the buffer stream.pending_send.push_back(&mut self.buffer, frame); - if stream.is_pending_send { - debug_assert!(!self.pending_send.is_empty()); - - // Already queued to have frame processed. - return; - } - // Queue the stream - push_sender(&mut self.pending_send, stream); + !push_sender(&mut self.pending_send, stream) + } - if let Some(ref task) = self.conn_task { - task.notify(); - } + /// Push the frame to the front of the stream's deque, scheduling the + /// steream if needed. + fn push_back_frame(&mut self, frame: Frame, stream: &mut store::Ptr) { + // Push the frame to the front of the stream's deque + stream.pending_send.push_front(&mut self.buffer, frame); + + // If needed, schedule the sender + push_sender(&mut self.pending_capacity, stream); } pub fn poll_complete(&mut self, store: &mut Store, - dst: &mut Codec) + dst: &mut Codec>) -> Poll<(), ConnectionError> where T: AsyncWrite, { + // Track the task self.conn_task = Some(task::current()); + // Ensure codec is ready + try_ready!(dst.poll_ready()); + + // Reclaim any frame that has previously been written + self.reclaim_frame(store, dst); + trace!("poll_complete"); loop { - // Ensure codec is ready - try_ready!(dst.poll_ready()); - match self.pop_frame(store) { Some(frame) => { trace!("writing frame={:?}", frame); @@ -106,15 +133,31 @@ impl Prioritize // We already verified that `dst` is ready to accept the // write assert!(res.is_ready()); + + // Ensure the codec is ready to try the loop again. + try_ready!(dst.poll_ready()); + + // Because, always try to reclaim... + self.reclaim_frame(store, dst); + + } + None => { + // Try to flush the codec. + try_ready!(dst.poll_complete()); + + // This might release a data frame... + if !self.reclaim_frame(store, dst) { + return Ok(().into()); + } + + // No need to poll ready as poll_complete() does this for + // us... } - None => break, } } - - Ok(().into()) } - fn pop_frame(&mut self, store: &mut Store) -> Option> { + fn pop_frame(&mut self, store: &mut Store) -> Option>> { loop { match self.pop_sender(store) { Some(mut stream) => { @@ -124,11 +167,7 @@ impl Prioritize if len > self.flow_control.effective_window_size() as usize { // TODO: This could be smarter... - stream.pending_send.push_front(&mut self.buffer, frame.into()); - - // Push the stream onto the list of streams - // waiting for connection capacity - push_sender(&mut self.pending_capacity, &mut stream); + self.push_back_frame(frame.into(), &mut stream); // Try again w/ the next stream continue; @@ -143,6 +182,14 @@ impl Prioritize push_sender(&mut self.pending_send, &mut stream); } + // Add prioritization logic + let frame = frame.map(|buf| { + Prioritized { + inner: buf, + stream: stream.key(), + } + }); + return Some(frame); } None => return None, @@ -174,10 +221,58 @@ impl Prioritize } } } + + fn reclaim_frame(&mut self, + store: &mut Store, + dst: &mut Codec>) -> bool + { + // First check if there are any data chunks to take back + if let Some(frame) = dst.take_last_data_frame() { + let mut stream = store.resolve(frame.payload().stream); + + let frame = frame.map(|prioritized| { + // TODO: Ensure fully written + prioritized.inner + }); + + self.push_back_frame(frame.into(), &mut stream); + + true + } else { + false + } + } } -fn push_sender(list: &mut store::List, stream: &mut store::Ptr) { - debug_assert!(!stream.is_pending_send); +/// Push the stream onto the `pending_send` list. Returns true if the sender was +/// not already queued. +fn push_sender(list: &mut store::List, stream: &mut store::Ptr) + -> bool +{ + if stream.is_pending_send { + return false; + } + list.push::(stream); stream.is_pending_send = true; + + true +} + +// ===== impl Prioritized ===== + +impl Buf for Prioritized + where B: Buf, +{ + fn remaining(&self) -> usize { + self.inner.remaining() + } + + fn bytes(&self) -> &[u8] { + self.inner.bytes() + } + + fn advance(&mut self, cnt: usize) { + self.inner.advance(cnt) + } } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 3af49c0..92ee567 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -353,7 +353,7 @@ impl Recv where B: Buf { } /// Send any pending refusals. - pub fn send_pending_refusal(&mut self, dst: &mut Codec) + pub fn send_pending_refusal(&mut self, dst: &mut Codec>) -> Poll<(), ConnectionError> where T: AsyncWrite, { diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 565c871..e9ae6c9 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -146,7 +146,7 @@ impl Send where B: Buf { pub fn poll_complete(&mut self, store: &mut Store, - dst: &mut Codec) + dst: &mut Codec>) -> Poll<(), ConnectionError> where T: AsyncWrite, { @@ -316,6 +316,8 @@ impl Send where B: Buf { } else { stream.unadvertised_send_window -= dec; } + + unimplemented!(); } }); } else if val > old_val { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 86b016f..ff3a0b7 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -8,19 +8,19 @@ use std::sync::{Arc, Mutex}; // TODO: All the VecDeques should become linked lists using the State // values. #[derive(Debug)] -pub struct Streams { +pub(crate) struct Streams { inner: Arc>>, } /// Reference to the stream state #[derive(Debug)] -pub struct StreamRef { +pub(crate) struct StreamRef { inner: Arc>>, key: store::Key, } #[derive(Debug)] -pub struct Chunk +pub(crate) struct Chunk where B: Buf, { inner: Arc>>, @@ -231,7 +231,7 @@ impl Streams Ok(()) } - pub fn send_pending_refusal(&mut self, dst: &mut Codec) + pub fn send_pending_refusal(&mut self, dst: &mut Codec>) -> Poll<(), ConnectionError> where T: AsyncWrite, { @@ -240,7 +240,7 @@ impl Streams me.actions.recv.send_pending_refusal(dst) } - pub fn poll_complete(&mut self, dst: &mut Codec) + pub fn poll_complete(&mut self, dst: &mut Codec>) -> Poll<(), ConnectionError> where T: AsyncWrite, {