Start hooking up data receiving

This commit is contained in:
Carl Lerche
2017-08-07 12:17:52 -07:00
parent d918215397
commit 71acfe3961
8 changed files with 181 additions and 33 deletions

View File

@@ -23,12 +23,21 @@ pub struct Client<T, B: IntoBuf> {
connection: Connection<T, Peer, B>,
}
/// Client half of an active HTTP/2.0 stream.
#[derive(Debug)]
pub struct Stream<B: IntoBuf> {
inner: proto::StreamRef<Peer, B::Buf>,
}
#[derive(Debug)]
pub struct Body<B: IntoBuf> {
inner: proto::StreamRef<Peer, B::Buf>,
}
#[derive(Debug)]
pub struct Chunk<B: IntoBuf> {
inner: proto::Chunk<Peer, B::Buf>,
}
impl<T> Client<T, Bytes>
where T: AsyncRead + AsyncWrite + 'static,
{
@@ -90,19 +99,6 @@ impl<T, B> Client<T, B>
}
}
impl<T, B> Future for Client<T, B>
// TODO: Get rid of 'static
where T: AsyncRead + AsyncWrite + 'static,
B: IntoBuf + 'static,
{
type Item = ();
type Error = ConnectionError;
fn poll(&mut self) -> Poll<(), ConnectionError> {
self.connection.poll()
}
}
impl<T, B> fmt::Debug for Client<T, B>
where T: fmt::Debug,
B: fmt::Debug + IntoBuf,
@@ -140,8 +136,11 @@ impl<T, B> fmt::Debug for Handshake<T, B>
impl<B: IntoBuf> Stream<B> {
/// Receive the HTTP/2.0 response, if it is ready.
pub fn poll_response(&mut self) -> Poll<Response<()>, ConnectionError> {
self.inner.poll_response()
pub fn poll_response(&mut self) -> Poll<Response<Body<B>>, ConnectionError> {
let (parts, _) = try_ready!(self.inner.poll_response()).into_parts();
let body = Body { inner: self.inner.clone() };
Ok(Response::from_parts(parts, body).into())
}
/// Send data
@@ -160,7 +159,7 @@ impl<B: IntoBuf> Stream<B> {
}
impl<B: IntoBuf> Future for Stream<B> {
type Item = Response<()>;
type Item = Response<Body<B>>;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

View File

@@ -67,6 +67,15 @@ pub enum Frame<T = Bytes> {
}
impl<T> Frame<T> {
/// Returns true if the frame is a DATA frame.
pub fn is_data(&self) -> bool {
use self::Frame::*;
match *self {
Data(..) => true,
_ => false,
}
}
}
impl<T> fmt::Debug for Frame<T> {

View File

@@ -127,19 +127,8 @@ impl<T, P, B> Connection<T, P, B>
*/
}
Some(Data(frame)) => {
unimplemented!();
/*
trace!("recv DATA; frame={:?}", frame);
try!(self.streams.recv_data(&frame));
let frame = Frame::Data {
id: frame.stream_id(),
end_of_stream: frame.is_end_stream(),
data: frame.into_payload(),
};
return Ok(Some(frame).into());
*/
try!(self.streams.recv_data(frame));
}
Some(Reset(frame)) => {
unimplemented!();

View File

@@ -6,7 +6,7 @@ mod settings;
mod streams;
pub use self::connection::Connection;
pub use self::streams::{Streams, StreamRef};
pub use self::streams::{Streams, StreamRef, Chunk};
use self::framed_read::FramedRead;
use self::framed_write::FramedWrite;

View File

@@ -88,4 +88,53 @@ impl<B> Deque<B> {
None => None,
}
}
pub fn take_while<F>(&mut self, buf: &mut Buffer<B>, mut f: F) -> Self
where F: FnMut(&Frame<B>) -> bool
{
match self.indices {
Some(mut idxs) => {
if !f(&buf.slab[idxs.head].frame) {
return Deque::new();
}
let head = idxs.head;
let mut tail = idxs.head;
loop {
let next = match buf.slab[tail].next {
Some(next) => next,
None => {
self.indices = None;
return Deque {
indices: Some(idxs),
_p: PhantomData,
};
}
};
if !f(&buf.slab[next].frame) {
// Split the linked list
buf.slab[tail].next = None;
self.indices = Some(Indices {
head: next,
tail: idxs.tail,
});
return Deque {
indices: Some(Indices {
head: head,
tail: tail,
}),
_p: PhantomData,
}
}
tail = next;
}
}
None => Deque::new(),
}
}
}

View File

@@ -8,7 +8,7 @@ mod store;
mod stream;
mod streams;
pub use self::streams::{Streams, StreamRef};
pub use self::streams::{Streams, StreamRef, Chunk};
use self::buffer::Buffer;
use self::flow_control::FlowControl;

View File

@@ -32,6 +32,12 @@ pub(super) struct Recv<P, B> {
_p: PhantomData<(P, B)>,
}
#[derive(Debug)]
pub(super) struct Chunk {
/// Data frames pending receival
pub pending_recv: buffer::Deque<Bytes>,
}
impl<P, B> Recv<P, B>
where P: Peer,
B: Buf,
@@ -98,7 +104,7 @@ impl<P, B> Recv<P, B>
}
pub fn recv_data(&mut self,
frame: &frame::Data,
frame: frame::Data,
stream: &mut Stream<B>)
-> Result<(), ConnectionError>
{
@@ -130,6 +136,10 @@ impl<P, B> Recv<P, B>
try!(stream.state.recv_close());
}
// Push the frame onto the recv buffer
stream.pending_recv.push_back(&mut self.buffer, frame.into());
stream.notify_recv();
Ok(())
}
@@ -218,6 +228,33 @@ impl<P, B> Recv<P, B>
Ok(().into())
}
pub fn poll_chunk(&mut self, stream: &mut Stream<B>)
-> Poll<Option<Chunk>, ConnectionError>
{
let frames = stream.pending_recv
.take_while(&mut self.buffer, |frame| frame.is_data());
if frames.is_empty() {
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
} else {
Ok(Some(Chunk {
pending_recv: frames,
}).into())
}
}
pub fn pop_bytes(&mut self, chunk: &mut Chunk) -> Option<Bytes> {
match chunk.pending_recv.pop_front(&mut self.buffer) {
Some(Frame::Data(frame)) => {
Some(frame.into_payload())
}
None => None,
_ => panic!("unexpected frame type"),
}
}
/// Send stream level window update
pub fn send_stream_window_update<T>(&mut self,
streams: &mut Store<B>,

View File

@@ -18,6 +18,15 @@ pub struct StreamRef<P, B> {
key: store::Key,
}
#[derive(Debug)]
pub struct Chunk<P, B>
where P: Peer,
B: Buf,
{
inner: Arc<Mutex<Inner<P, B>>>,
recv: recv::Chunk,
}
/// Fields needed to manage state related to managing the set of streams. This
/// is mostly split out to make ownership happy.
///
@@ -103,7 +112,7 @@ impl<P, B> Streams<P, B>
Ok(ret)
}
pub fn recv_data(&mut self, frame: &frame::Data)
pub fn recv_data(&mut self, frame: frame::Data)
-> Result<(), ConnectionError>
{
let id = frame.stream_id();
@@ -305,6 +314,34 @@ impl<B> Streams<client::Peer, B>
}
}
// ===== impl StreamRef =====
impl<P, B> StreamRef<P, B>
where P: Peer,
B: Buf,
{
pub fn poll_data(&mut self) -> Poll<Option<Chunk<P, B>>, ConnectionError> {
let recv = {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.key);
try_ready!(me.actions.recv.poll_chunk(&mut stream))
};
// Convert to a chunk
let chunk = recv.map(|recv| {
Chunk {
inner: self.inner.clone(),
recv: recv,
}
});
Ok(chunk.into())
}
}
impl<B> StreamRef<client::Peer, B>
where B: Buf,
{
@@ -318,6 +355,34 @@ impl<B> StreamRef<client::Peer, B>
}
}
impl<P, B> Clone for StreamRef<P, B> {
fn clone(&self) -> Self {
StreamRef {
inner: self.inner.clone(),
key: self.key.clone(),
}
}
}
// ===== impl Chunk =====
impl<P, B> Drop for Chunk<P, B>
where P: Peer,
B: Buf,
{
fn drop(&mut self) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
while let Some(_) = me.actions.recv.pop_bytes(&mut self.recv) {
}
}
}
// ===== impl Actions =====
impl<P, B> Actions<P, B>
where P: Peer,
B: Buf,