From 71acfe3961a5dacfeb25929812d4ed4a3f2d2142 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 7 Aug 2017 12:17:52 -0700 Subject: [PATCH] Start hooking up data receiving --- src/client.rs | 33 +++++++++--------- src/frame/mod.rs | 9 +++++ src/proto/connection.rs | 13 +------ src/proto/mod.rs | 2 +- src/proto/streams/buffer.rs | 49 ++++++++++++++++++++++++++ src/proto/streams/mod.rs | 2 +- src/proto/streams/recv.rs | 39 ++++++++++++++++++++- src/proto/streams/streams.rs | 67 +++++++++++++++++++++++++++++++++++- 8 files changed, 181 insertions(+), 33 deletions(-) diff --git a/src/client.rs b/src/client.rs index c980b59..7ae70c0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -23,12 +23,21 @@ pub struct Client { connection: Connection, } -/// Client half of an active HTTP/2.0 stream. #[derive(Debug)] pub struct Stream { inner: proto::StreamRef, } +#[derive(Debug)] +pub struct Body { + inner: proto::StreamRef, +} + +#[derive(Debug)] +pub struct Chunk { + inner: proto::Chunk, +} + impl Client where T: AsyncRead + AsyncWrite + 'static, { @@ -90,19 +99,6 @@ impl Client } } -impl Future for Client - // TODO: Get rid of 'static - where T: AsyncRead + AsyncWrite + 'static, - B: IntoBuf + 'static, -{ - type Item = (); - type Error = ConnectionError; - - fn poll(&mut self) -> Poll<(), ConnectionError> { - self.connection.poll() - } -} - impl fmt::Debug for Client where T: fmt::Debug, B: fmt::Debug + IntoBuf, @@ -140,8 +136,11 @@ impl fmt::Debug for Handshake impl Stream { /// Receive the HTTP/2.0 response, if it is ready. - pub fn poll_response(&mut self) -> Poll, ConnectionError> { - self.inner.poll_response() + pub fn poll_response(&mut self) -> Poll>, ConnectionError> { + let (parts, _) = try_ready!(self.inner.poll_response()).into_parts(); + let body = Body { inner: self.inner.clone() }; + + Ok(Response::from_parts(parts, body).into()) } /// Send data @@ -160,7 +159,7 @@ impl Stream { } impl Future for Stream { - type Item = Response<()>; + type Item = Response>; type Error = ConnectionError; fn poll(&mut self) -> Poll { diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 2480a71..b7a8022 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -67,6 +67,15 @@ pub enum Frame { } impl Frame { + /// Returns true if the frame is a DATA frame. + pub fn is_data(&self) -> bool { + use self::Frame::*; + + match *self { + Data(..) => true, + _ => false, + } + } } impl fmt::Debug for Frame { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index e8b3cd6..a28ce80 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -127,19 +127,8 @@ impl Connection */ } Some(Data(frame)) => { - unimplemented!(); - /* trace!("recv DATA; frame={:?}", frame); - try!(self.streams.recv_data(&frame)); - - let frame = Frame::Data { - id: frame.stream_id(), - end_of_stream: frame.is_end_stream(), - data: frame.into_payload(), - }; - - return Ok(Some(frame).into()); - */ + try!(self.streams.recv_data(frame)); } Some(Reset(frame)) => { unimplemented!(); diff --git a/src/proto/mod.rs b/src/proto/mod.rs index aaca6db..7c842d9 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -6,7 +6,7 @@ mod settings; mod streams; pub use self::connection::Connection; -pub use self::streams::{Streams, StreamRef}; +pub use self::streams::{Streams, StreamRef, Chunk}; use self::framed_read::FramedRead; use self::framed_write::FramedWrite; diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 8aa1c99..077d10d 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -88,4 +88,53 @@ impl Deque { None => None, } } + + pub fn take_while(&mut self, buf: &mut Buffer, mut f: F) -> Self + where F: FnMut(&Frame) -> bool + { + match self.indices { + Some(mut idxs) => { + if !f(&buf.slab[idxs.head].frame) { + return Deque::new(); + } + + let head = idxs.head; + let mut tail = idxs.head; + + loop { + let next = match buf.slab[tail].next { + Some(next) => next, + None => { + self.indices = None; + return Deque { + indices: Some(idxs), + _p: PhantomData, + }; + } + }; + + if !f(&buf.slab[next].frame) { + // Split the linked list + buf.slab[tail].next = None; + + self.indices = Some(Indices { + head: next, + tail: idxs.tail, + }); + + return Deque { + indices: Some(Indices { + head: head, + tail: tail, + }), + _p: PhantomData, + } + } + + tail = next; + } + } + None => Deque::new(), + } + } } diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index c03ea99..1dfe92b 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -8,7 +8,7 @@ mod store; mod stream; mod streams; -pub use self::streams::{Streams, StreamRef}; +pub use self::streams::{Streams, StreamRef, Chunk}; use self::buffer::Buffer; use self::flow_control::FlowControl; diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 0de3d4e..8a6d4d1 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -32,6 +32,12 @@ pub(super) struct Recv { _p: PhantomData<(P, B)>, } +#[derive(Debug)] +pub(super) struct Chunk { + /// Data frames pending receival + pub pending_recv: buffer::Deque, +} + impl Recv where P: Peer, B: Buf, @@ -98,7 +104,7 @@ impl Recv } pub fn recv_data(&mut self, - frame: &frame::Data, + frame: frame::Data, stream: &mut Stream) -> Result<(), ConnectionError> { @@ -130,6 +136,10 @@ impl Recv try!(stream.state.recv_close()); } + // Push the frame onto the recv buffer + stream.pending_recv.push_back(&mut self.buffer, frame.into()); + stream.notify_recv(); + Ok(()) } @@ -218,6 +228,33 @@ impl Recv Ok(().into()) } + + pub fn poll_chunk(&mut self, stream: &mut Stream) + -> Poll, ConnectionError> + { + let frames = stream.pending_recv + .take_while(&mut self.buffer, |frame| frame.is_data()); + + if frames.is_empty() { + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } else { + Ok(Some(Chunk { + pending_recv: frames, + }).into()) + } + } + + pub fn pop_bytes(&mut self, chunk: &mut Chunk) -> Option { + match chunk.pending_recv.pop_front(&mut self.buffer) { + Some(Frame::Data(frame)) => { + Some(frame.into_payload()) + } + None => None, + _ => panic!("unexpected frame type"), + } + } + /// Send stream level window update pub fn send_stream_window_update(&mut self, streams: &mut Store, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 8134b80..23a44be 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -18,6 +18,15 @@ pub struct StreamRef { key: store::Key, } +#[derive(Debug)] +pub struct Chunk + where P: Peer, + B: Buf, +{ + inner: Arc>>, + recv: recv::Chunk, +} + /// Fields needed to manage state related to managing the set of streams. This /// is mostly split out to make ownership happy. /// @@ -103,7 +112,7 @@ impl Streams Ok(ret) } - pub fn recv_data(&mut self, frame: &frame::Data) + pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), ConnectionError> { let id = frame.stream_id(); @@ -305,6 +314,34 @@ impl Streams } } +// ===== impl StreamRef ===== + +impl StreamRef + where P: Peer, + B: Buf, +{ + pub fn poll_data(&mut self) -> Poll>, ConnectionError> { + let recv = { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + try_ready!(me.actions.recv.poll_chunk(&mut stream)) + }; + + // Convert to a chunk + let chunk = recv.map(|recv| { + Chunk { + inner: self.inner.clone(), + recv: recv, + } + }); + + Ok(chunk.into()) + } +} + impl StreamRef where B: Buf, { @@ -318,6 +355,34 @@ impl StreamRef } } + + +impl Clone for StreamRef { + fn clone(&self) -> Self { + StreamRef { + inner: self.inner.clone(), + key: self.key.clone(), + } + } +} + +// ===== impl Chunk ===== + +impl Drop for Chunk + where P: Peer, + B: Buf, +{ + fn drop(&mut self) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + while let Some(_) = me.actions.recv.pop_bytes(&mut self.recv) { + } + } +} + +// ===== impl Actions ===== + impl Actions where P: Peer, B: Buf,