From fc0a7eb898c42836e2caa057e09898e0c4365ece Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 4 Aug 2017 12:12:22 -0700 Subject: [PATCH] More work --- src/client.rs | 4 +- src/proto/connection.rs | 193 ++++++++++++++++++-------------- src/proto/streams/mod.rs | 1 + src/proto/streams/prioritize.rs | 34 +++++- src/proto/streams/recv.rs | 48 +++++++- src/proto/streams/send.rs | 9 ++ src/proto/streams/store.rs | 19 +++- src/proto/streams/stream.rs | 14 +++ src/proto/streams/streams.rs | 48 ++++++-- 9 files changed, 264 insertions(+), 106 deletions(-) diff --git a/src/client.rs b/src/client.rs index 94b043a..dae1040 100644 --- a/src/client.rs +++ b/src/client.rs @@ -126,8 +126,8 @@ impl fmt::Debug for Handshake impl Stream { /// Receive the HTTP/2.0 response, if it is ready. - pub fn poll_response(&mut self) -> Poll<(), ConnectionError> { - unimplemented!(); + pub fn poll_response(&mut self) -> Poll, ConnectionError> { + self.inner.poll_response() } /// Send data diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 3e3d724..b363c18 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -90,6 +90,112 @@ impl Connection Ok(().into()) } + /// Advances the internal state of the connection. + pub fn poll(&mut self) -> Poll, ConnectionError> { + use frame::Frame::*; + + loop { + // First, ensure that the `Connection` is able to receive a frame + try_ready!(self.poll_recv_ready()); + + trace!("polling codec"); + + let frame = match try!(self.codec.poll()) { + Async::Ready(frame) => frame, + Async::NotReady => { + // Flush any pending writes + let _ = try!(self.poll_complete()); + return Ok(Async::NotReady); + } + }; + + match frame { + Some(Headers(frame)) => { + trace!("recv HEADERS; frame={:?}", frame); + + if let Some(frame) = try!(self.streams.recv_headers(frame)) { + unimplemented!(); + } + + /* + // Update stream state while ensuring that the headers frame + // can be received. + if let Some(frame) = try!(self.streams.recv_headers(frame)) { + let frame = Self::convert_poll_message(frame)?; + return Ok(Some(frame).into()); + } + */ + } + Some(Data(frame)) => { + unimplemented!(); + /* + trace!("recv DATA; frame={:?}", frame); + try!(self.streams.recv_data(&frame)); + + let frame = Frame::Data { + id: frame.stream_id(), + end_of_stream: frame.is_end_stream(), + data: frame.into_payload(), + }; + + return Ok(Some(frame).into()); + */ + } + Some(Reset(frame)) => { + unimplemented!(); + /* + trace!("recv RST_STREAM; frame={:?}", frame); + try!(self.streams.recv_reset(&frame)); + + let frame = Frame::Reset { + id: frame.stream_id(), + error: frame.reason(), + }; + + return Ok(Some(frame).into()); + */ + } + Some(PushPromise(frame)) => { + unimplemented!(); + /* + trace!("recv PUSH_PROMISE; frame={:?}", frame); + try!(self.streams.recv_push_promise(frame)); + */ + } + Some(Settings(frame)) => { + trace!("recv SETTINGS; frame={:?}", frame); + self.settings.recv_settings(frame); + + // TODO: ACK must be sent THEN settings applied. + } + Some(Ping(frame)) => { + unimplemented!(); + /* + trace!("recv PING; frame={:?}", frame); + self.ping_pong.recv_ping(frame); + */ + } + Some(WindowUpdate(frame)) => { + unimplemented!(); + /* + trace!("recv WINDOW_UPDATE; frame={:?}", frame); + try!(self.streams.recv_window_update(frame)); + */ + } + None => { + unimplemented!(); + /* + trace!("codec closed"); + return Ok(Async::Ready(None)); + */ + } + } + } + + // TODO: Flush the write buffer + unimplemented!(); + } + /* pub fn send_data(self, id: StreamId, @@ -138,96 +244,17 @@ impl Connection /// This function is currently used by poll_complete, but at some point it /// will probably not be required. fn poll_send_ready(&mut self) -> Poll<(), ConnectionError> { + // TODO: Is this function needed? try_ready!(self.poll_recv_ready()); - // Ensure all window updates have been sent. - try_ready!(self.streams.send_pending_window_updates(&mut self.codec)); - Ok(().into()) } - /// Try to receive the next frame - fn recv_frame(&mut self) -> Poll>, ConnectionError> { - use frame::Frame::*; - - loop { - // First, ensure that the `Connection` is able to receive a frame - try_ready!(self.poll_recv_ready()); - - trace!("polling codec"); - - let frame = match try!(self.codec.poll()) { - Async::Ready(frame) => frame, - Async::NotReady => { - // Receiving new frames may depend on ensuring that the write buffer - // is clear (e.g. if window updates need to be sent), so `poll_complete` - // is called here. - let _ = try!(self.poll_complete()); - return Ok(Async::NotReady); - } - }; - - match frame { - Some(Headers(frame)) => { - trace!("recv HEADERS; frame={:?}", frame); - // Update stream state while ensuring that the headers frame - // can be received. - if let Some(frame) = try!(self.streams.recv_headers(frame)) { - let frame = Self::convert_poll_message(frame)?; - return Ok(Some(frame).into()); - } - } - Some(Data(frame)) => { - trace!("recv DATA; frame={:?}", frame); - try!(self.streams.recv_data(&frame)); - - let frame = Frame::Data { - id: frame.stream_id(), - end_of_stream: frame.is_end_stream(), - data: frame.into_payload(), - }; - - return Ok(Some(frame).into()); - } - Some(Reset(frame)) => { - trace!("recv RST_STREAM; frame={:?}", frame); - try!(self.streams.recv_reset(&frame)); - - let frame = Frame::Reset { - id: frame.stream_id(), - error: frame.reason(), - }; - - return Ok(Some(frame).into()); - } - Some(PushPromise(frame)) => { - trace!("recv PUSH_PROMISE; frame={:?}", frame); - try!(self.streams.recv_push_promise(frame)); - } - Some(Settings(frame)) => { - trace!("recv SETTINGS; frame={:?}", frame); - self.settings.recv_settings(frame); - - // TODO: ACK must be sent THEN settings applied. - } - Some(Ping(frame)) => { - trace!("recv PING; frame={:?}", frame); - self.ping_pong.recv_ping(frame); - } - Some(WindowUpdate(frame)) => { - trace!("recv WINDOW_UPDATE; frame={:?}", frame); - try!(self.streams.recv_window_update(frame)); - } - None => { - trace!("codec closed"); - return Ok(Async::Ready(None)); - } - } - } - } - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { try_ready!(self.poll_send_ready()); + + // Ensure all window updates have been sent. + try_ready!(self.streams.poll_complete(&mut self.codec)); try_ready!(self.codec.poll_complete()); Ok(().into()) diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 2b5429e..c03ea99 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -25,6 +25,7 @@ use error::Reason::*; use error::User::*; use http::{Request, Response}; +use bytes::Bytes; #[derive(Debug)] pub struct Config { diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 0e0dbfd..4acf644 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -14,7 +14,9 @@ struct Indices { tail: store::Key, } -impl Prioritize { +impl Prioritize + where B: Buf, +{ pub fn new() -> Prioritize { Prioritize { pending_send: None, @@ -58,4 +60,34 @@ impl Prioritize { stream.is_pending_send = true; } + + pub fn poll_complete(&mut self, + store: &mut Store, + dst: &mut Codec) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + { + loop { + // Ensure codec is ready + try_ready!(dst.poll_ready()); + + match self.pop_frame(store) { + Some(frame) => { + // TODO: data frames should be handled specially... + let res = dst.start_send(frame)?; + + // We already verified that `dst` is ready to accept the + // write + assert!(res.is_ready()); + } + None => break, + } + } + + Ok(().into()) + } + + fn pop_frame(&mut self, store: &mut Store) -> Option> { + unimplemented!(); + } } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 99a7a39..0de3d4e 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,4 +1,4 @@ -use {frame, ConnectionError}; +use {client, frame, ConnectionError}; use proto::*; use super::*; @@ -23,6 +23,9 @@ pub(super) struct Recv { pending_window_updates: VecDeque, + /// Holds frames that are waiting to be read + buffer: Buffer, + /// Refused StreamId, this represents a frame that must be sent out. refused: Option, @@ -40,6 +43,7 @@ impl Recv init_window_sz: config.init_remote_window_sz, flow_control: FlowControl::new(config.init_remote_window_sz), pending_window_updates: VecDeque::new(), + buffer: Buffer::new(), refused: None, _p: PhantomData, } @@ -67,10 +71,24 @@ impl Recv } /// Transition the stream state based on receiving headers - pub fn recv_headers(&mut self, stream: &mut Stream, eos: bool) - -> Result<(), ConnectionError> + pub fn recv_headers(&mut self, + frame: frame::Headers, + stream: &mut store::Ptr) + -> Result, ConnectionError> { - stream.state.recv_open(self.init_window_sz, eos) + stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?; + + // Only servers can receive a headers frame that initiates the stream. + // This is verified in `Streams` before calling this function. + if P::is_server() { + Ok(Some(frame)) + } else { + // Push the frame onto the recv buffer + stream.pending_recv.push_back(&mut self.buffer, frame.into()); + stream.notify_recv(); + + Ok(None) + } } pub fn recv_eos(&mut self, stream: &mut Stream) @@ -233,3 +251,25 @@ impl Recv unimplemented!(); } } + +impl Recv + where B: Buf, +{ + pub fn poll_response(&mut self, stream: &mut store::Ptr) + -> Poll, ConnectionError> { + // If the buffer is not empty, then the first frame must be a HEADERS + // frame or the user violated the contract. + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Frame::Headers(v)) => { + // TODO: This error should probably be caught on receipt of the + // frame vs. now. + Ok(client::Peer::convert_poll_message(v)?.into()) + } + Some(frame) => unimplemented!(), + None => { + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } + } + } +} diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 6ed4fdd..a6d8031 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -151,6 +151,15 @@ impl Send Ok(()) } + pub fn poll_complete(&mut self, + store: &mut Store, + dst: &mut Codec) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + { + self.prioritize.poll_complete(store, dst) + } + /// Get pending window updates pub fn poll_window_update(&mut self, streams: &mut Store) -> Poll diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index b19c755..d59eb5b 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -47,6 +47,13 @@ impl Store { } } + pub fn resolve(&mut self, key: Key) -> Ptr { + Ptr { + key: key, + store: self, + } + } + pub fn find_mut(&mut self, id: &StreamId) -> Option<&mut Stream> { if let Some(handle) = self.ids.get(id) { Some(&mut self.slab[*handle]) @@ -117,6 +124,10 @@ impl<'a, B: 'a> ops::DerefMut for Ptr<'a, B> { // ===== impl OccupiedEntry ===== impl<'a, B> OccupiedEntry<'a, B> { + pub fn key(&self) -> Key { + Key(*self.ids.get()) + } + pub fn get(&self) -> &Stream { &self.slab[*self.ids.get()] } @@ -133,13 +144,13 @@ impl<'a, B> OccupiedEntry<'a, B> { // ===== impl VacantEntry ===== // impl<'a, B> VacantEntry<'a, B> { - pub fn insert(self, value: Stream) -> &'a mut Stream { + pub fn insert(self, value: Stream) -> Key { // Insert the value in the slab - let handle = self.slab.insert(value); + let key = self.slab.insert(value); // Insert the handle in the ID map - self.ids.insert(handle); + self.ids.insert(key); - &mut self.slab[handle] + Key(key) } } diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 0689d3c..588edcb 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -5,6 +5,12 @@ pub(super) struct Stream { /// Current state of the stream pub state: State, + /// Frames pending for this stream to read + pub pending_recv: buffer::Deque, + + /// Task tracking receiving frames + pub recv_task: Option, + /// Frames pending for this stream being sent to the socket pub pending_send: buffer::Deque, @@ -19,6 +25,8 @@ impl Stream { pub fn new() -> Stream { Stream { state: State::default(), + pending_recv: buffer::Deque::new(), + recv_task: None, pending_send: buffer::Deque::new(), next_pending_send: None, is_pending_send: false, @@ -32,4 +40,10 @@ impl Stream { pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> { self.state.recv_flow_control() } + + pub fn notify_recv(&mut self) { + if let Some(ref mut task) = self.recv_task { + task.notify(); + } + } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index b5b3ded..231ea01 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -15,7 +15,7 @@ pub struct Streams { #[derive(Debug)] pub struct StreamRef { inner: Arc>>, - id: StreamId, + key: store::Key, } /// Fields needed to manage state related to managing the set of streams. This @@ -53,6 +53,7 @@ impl Streams } } + /// Process inbound headers pub fn recv_headers(&mut self, frame: frame::Headers) -> Result, ConnectionError> { @@ -60,8 +61,8 @@ impl Streams let mut me = self.inner.lock().unwrap(); let me = &mut *me; - let stream = match me.store.find_entry(id) { - Entry::Occupied(e) => e.into_mut(), + let key = match me.store.find_entry(id) { + Entry::Occupied(e) => e.key(), Entry::Vacant(e) => { // Trailers cannot open a stream. Trailers are header frames // that do not contain pseudo headers. Requests MUST contain a @@ -78,22 +79,28 @@ impl Streams } }; - if frame.is_trailers() { + let mut stream = me.store.resolve(key); + + let ret = if frame.is_trailers() { + unimplemented!(); + /* if !frame.is_end_stream() { // TODO: What error should this return? unimplemented!(); } try!(me.actions.recv.recv_eos(stream)); + */ } else { - try!(me.actions.recv.recv_headers(stream, frame.is_end_stream())); - } + try!(me.actions.recv.recv_headers(frame, &mut stream)) + }; + // TODO: move this into a fn if stream.state.is_closed() { me.actions.dec_num_streams(id); } - Ok(Some(frame)) + Ok(ret) } pub fn recv_data(&mut self, frame: &frame::Data) @@ -241,16 +248,20 @@ impl Streams me.actions.recv.send_pending_refusal(dst) } - pub fn send_pending_window_updates(&mut self, dst: &mut Codec) + pub fn poll_complete(&mut self, dst: &mut Codec) -> Poll<(), ConnectionError> where T: AsyncWrite, { let mut me = self.inner.lock().unwrap(); let me = &mut *me; + + // TODO: sending window updates should be part of Prioritize + /* try_ready!(me.actions.recv.send_connection_window_update(dst)); try_ready!(me.actions.recv.send_stream_window_update(&mut me.store, dst)); + */ - Ok(().into()) + me.actions.send.poll_complete(&mut me.store, dst) } } @@ -260,7 +271,7 @@ impl Streams pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) -> Result, ConnectionError> { - let id = { + let key = { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -279,16 +290,29 @@ impl Streams // closed state. debug_assert!(!stream.state.is_closed()); - id + stream.key() }; Ok(StreamRef { inner: self.inner.clone(), - id: id, + key: key, }) } } +impl StreamRef + where B: Buf, +{ + pub fn poll_response(&mut self) -> Poll, ConnectionError> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.recv.poll_response(&mut stream) + } +} + impl Actions where P: Peer, B: Buf,