Get data frames working

This commit is contained in:
Carl Lerche
2017-07-08 12:34:29 -07:00
parent f6b6d0c7e8
commit 981af88838
17 changed files with 523 additions and 146 deletions

View File

@@ -1,11 +1,14 @@
use {frame, Frame, ConnectionError, Peer, StreamId};
use {Frame, Peer};
use client::Client;
use server::Server;
use frame::{self, StreamId};
use proto::{self, ReadySink, State};
use error::{self, ConnectionError};
use tokio_io::{AsyncRead, AsyncWrite};
use http::{request, response};
use bytes::{Bytes, IntoBuf};
use futures::*;
@@ -17,17 +20,18 @@ use std::hash::BuildHasherDefault;
/// An H2 connection
#[derive(Debug)]
pub struct Connection<T, P> {
inner: proto::Inner<T>,
pub struct Connection<T, P, B: IntoBuf = Bytes> {
inner: proto::Inner<T, B::Buf>,
streams: StreamMap<State>,
peer: PhantomData<P>,
peer: PhantomData<(P, B)>,
}
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
pub fn new<T, P>(transport: proto::Inner<T>) -> Connection<T, P>
pub fn new<T, P, B>(transport: proto::Inner<T, B::Buf>) -> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
Connection {
inner: transport,
@@ -36,8 +40,28 @@ pub fn new<T, P>(transport: proto::Inner<T>) -> Connection<T, P>
}
}
impl<T> Connection<T, Client>
impl<T, P, B> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
pub fn send_data(self,
id: StreamId,
data: B,
end_of_stream: bool)
-> sink::Send<Self>
{
self.send(Frame::Data {
id: id,
data: data,
end_of_stream: end_of_stream,
})
}
}
impl<T, B> Connection<T, Client, B>
where T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
pub fn send_request(self,
id: StreamId, // TODO: Generate one internally?
@@ -53,8 +77,9 @@ impl<T> Connection<T, Client>
}
}
impl<T> Connection<T, Server>
impl<T, B> Connection<T, Server, B>
where T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
pub fn send_response(self,
id: StreamId, // TODO: Generate one internally?
@@ -70,9 +95,10 @@ impl<T> Connection<T, Server>
}
}
impl<T, P> Stream for Connection<T, P>
impl<T, P, B> Stream for Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
type Item = Frame<P::Poll>;
type Error = ConnectionError;
@@ -96,7 +122,6 @@ impl<T, P> Stream for Connection<T, P>
let frame = match frame {
Some(Headers(v)) => {
// TODO: Update stream state
let stream_id = v.stream_id();
let end_of_stream = v.is_end_stream();
@@ -121,14 +146,17 @@ impl<T, P> Stream for Connection<T, P>
}
}
Some(Data(v)) => {
// TODO: Validate frame
let stream_id = v.stream_id();
let end_of_stream = v.is_end_stream();
Frame::Body {
match self.streams.get_mut(&stream_id) {
None => return Err(error::Reason::ProtocolError.into()),
Some(state) => try!(state.recv_data(end_of_stream)),
}
Frame::Data {
id: stream_id,
chunk: v.into_payload(),
data: v.into_payload(),
end_of_stream: end_of_stream,
}
}
@@ -140,11 +168,12 @@ impl<T, P> Stream for Connection<T, P>
}
}
impl<T, P> Sink for Connection<T, P>
impl<T, P, B> Sink for Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
type SinkItem = Frame<P::Send>;
type SinkItem = Frame<P::Send, B>;
type SinkError = ConnectionError;
fn start_send(&mut self, item: Self::SinkItem)
@@ -171,7 +200,8 @@ impl<T, P> Sink for Connection<T, P>
// connections should not be factored.
//
if !P::is_valid_local_stream_id(id) {
unimplemented!();
// TODO: clear state
return Err(error::User::InvalidStreamId.into());
}
}
@@ -188,6 +218,25 @@ impl<T, P> Sink for Connection<T, P>
Ok(AsyncSink::Ready)
}
Frame::Data { id, data, end_of_stream } => {
// The stream must be initialized at this point
match self.streams.get_mut(&id) {
None => return Err(error::User::InactiveStreamId.into()),
Some(state) => try!(state.send_data(end_of_stream)),
}
let mut frame = frame::Data::new(id, data.into_buf());
if end_of_stream {
frame.set_end_stream();
}
let res = try!(self.inner.start_send(frame.into()));
assert!(res.is_ready());
Ok(AsyncSink::Ready)
}
/*
Frame::Trailers { id, headers } => {
unimplemented!();
@@ -211,9 +260,10 @@ impl<T, P> Sink for Connection<T, P>
}
}
impl<T, P> ReadySink for Connection<T, P>
impl<T, P, B> ReadySink for Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
fn poll_ready(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_ready()