Merge branch 'master' into ver/flowio
This commit is contained in:
@@ -1,12 +1,14 @@
|
||||
use {Frame, ConnectionError, Peer, StreamId};
|
||||
use Frame;
|
||||
use client::Client;
|
||||
use frame::{Frame as WireFrame};
|
||||
use proto::{self, FlowController, ReadySink, PeerState, State, WindowUpdate};
|
||||
use error::{self, ConnectionError};
|
||||
use frame::{self, StreamId};
|
||||
use proto::{self, Peer, ReadySink, State, PeerState, WindowUpdate, FlowController};
|
||||
use server::Server;
|
||||
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use http::{request, response};
|
||||
use bytes::{Bytes, IntoBuf};
|
||||
|
||||
use futures::*;
|
||||
|
||||
@@ -21,8 +23,8 @@ use std::marker::PhantomData;
|
||||
|
||||
/// An H2 connection
|
||||
#[derive(Debug)]
|
||||
pub struct Connection<T, P> {
|
||||
inner: proto::Inner<T>,
|
||||
pub struct Connection<T, P, B: IntoBuf = Bytes> {
|
||||
inner: proto::Inner<T, B::Buf>,
|
||||
streams: StreamMap<State>,
|
||||
peer: PhantomData<P>,
|
||||
|
||||
@@ -39,12 +41,13 @@ pub struct Connection<T, P> {
|
||||
|
||||
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
|
||||
|
||||
pub fn new<T, P>(transport: proto::Inner<T>,
|
||||
pub fn new<T, P, B>(transport: proto::Inner<T, B::Buf>,
|
||||
initial_local_window_size: u32,
|
||||
initial_remote_window_size: u32)
|
||||
-> Connection<T, P>
|
||||
-> Connection<T, P, B>
|
||||
where T: AsyncRead + AsyncWrite,
|
||||
P: Peer,
|
||||
B: IntoBuf,
|
||||
{
|
||||
Connection {
|
||||
inner: transport,
|
||||
@@ -62,7 +65,8 @@ pub fn new<T, P>(transport: proto::Inner<T>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, P> Connection<T, P> {
|
||||
impl<T, P, B: IntoBuf> Connection<T, P, B> {
|
||||
|
||||
/// Publishes local stream window updates to the remote.
|
||||
///
|
||||
/// Connection window updates (StreamId=0) and stream window must be published
|
||||
@@ -118,8 +122,28 @@ impl<T, P> Connection<T, P> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Connection<T, Client>
|
||||
impl<T, P, B> Connection<T, P, B>
|
||||
where T: AsyncRead + AsyncWrite,
|
||||
P: Peer,
|
||||
B: IntoBuf,
|
||||
{
|
||||
pub fn send_data(self,
|
||||
id: StreamId,
|
||||
data: B,
|
||||
end_of_stream: bool)
|
||||
-> sink::Send<Self>
|
||||
{
|
||||
self.send(Frame::Data {
|
||||
id: id,
|
||||
data: data,
|
||||
end_of_stream: end_of_stream,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, B> Connection<T, Client, B>
|
||||
where T: AsyncRead + AsyncWrite,
|
||||
B: IntoBuf,
|
||||
{
|
||||
pub fn send_request(self,
|
||||
id: StreamId, // TODO: Generate one internally?
|
||||
@@ -135,8 +159,9 @@ impl<T> Connection<T, Client>
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Connection<T, Server>
|
||||
impl<T, B> Connection<T, Server, B>
|
||||
where T: AsyncRead + AsyncWrite,
|
||||
B: IntoBuf,
|
||||
{
|
||||
pub fn send_response(self,
|
||||
id: StreamId, // TODO: Generate one internally?
|
||||
@@ -152,17 +177,19 @@ impl<T> Connection<T, Server>
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, P> Stream for Connection<T, P>
|
||||
impl<T, P, B> Stream for Connection<T, P, B>
|
||||
where T: AsyncRead + AsyncWrite,
|
||||
P: Peer,
|
||||
B: IntoBuf,
|
||||
{
|
||||
type Item = Frame<P::Poll>;
|
||||
type Error = ConnectionError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, ConnectionError> {
|
||||
use frame::Frame::*;
|
||||
trace!("Connection::poll");
|
||||
|
||||
loop {
|
||||
loop {
|
||||
let frame = match try!(self.inner.poll()) {
|
||||
Async::Ready(f) => f,
|
||||
Async::NotReady => {
|
||||
@@ -176,7 +203,7 @@ impl<T, P> Stream for Connection<T, P>
|
||||
trace!("received; frame={:?}", frame);
|
||||
|
||||
let frame = match frame {
|
||||
Some(WireFrame::Headers(v)) => {
|
||||
Some(Headers(v)) => {
|
||||
// TODO: Update stream state
|
||||
let stream_id = v.stream_id();
|
||||
let end_of_stream = v.is_end_stream();
|
||||
@@ -204,20 +231,24 @@ impl<T, P> Stream for Connection<T, P>
|
||||
end_of_stream: end_of_stream,
|
||||
}
|
||||
}
|
||||
Some(WireFrame::Data(v)) => {
|
||||
Some(Data(v)) => {
|
||||
// TODO: Validate frame
|
||||
|
||||
let stream_id = v.stream_id();
|
||||
let end_of_stream = v.is_end_stream();
|
||||
match self.streams.get_mut(&stream_id) {
|
||||
None => return Err(error::Reason::ProtocolError.into()),
|
||||
Some(state) => try!(state.recv_data(end_of_stream)),
|
||||
}
|
||||
|
||||
Frame::Body {
|
||||
Frame::Data {
|
||||
id: stream_id,
|
||||
chunk: v.into_payload(),
|
||||
data: v.into_payload(),
|
||||
end_of_stream: end_of_stream,
|
||||
}
|
||||
}
|
||||
Some(WireFrame::WindowUpdate(v)) => {
|
||||
self.increment_remote_window(v.stream_id(), v.increment());
|
||||
Some(WindowUpdate(v)) => {
|
||||
self.increment_remote_window(v.stream_id(), v.size_increment());
|
||||
continue;
|
||||
}
|
||||
Some(frame) => panic!("unexpected frame; frame={:?}", frame),
|
||||
@@ -229,16 +260,19 @@ impl<T, P> Stream for Connection<T, P>
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, P> Sink for Connection<T, P>
|
||||
impl<T, P, B> Sink for Connection<T, P, B>
|
||||
where T: AsyncRead + AsyncWrite,
|
||||
P: Peer,
|
||||
B: IntoBuf,
|
||||
{
|
||||
type SinkItem = Frame<P::Send>;
|
||||
type SinkItem = Frame<P::Send, B>;
|
||||
type SinkError = ConnectionError;
|
||||
|
||||
fn start_send(&mut self, item: Self::SinkItem)
|
||||
-> StartSend<Self::SinkItem, Self::SinkError>
|
||||
{
|
||||
use frame::Frame::Headers;
|
||||
|
||||
// First ensure that the upstream can process a new item
|
||||
if !try!(self.poll_ready()).is_ready() {
|
||||
return Ok(AsyncSink::NotReady(item));
|
||||
@@ -262,7 +296,8 @@ impl<T, P> Sink for Connection<T, P>
|
||||
// connections should not be factored.
|
||||
//
|
||||
if !P::is_valid_local_stream_id(id) {
|
||||
unimplemented!();
|
||||
// TODO: clear state
|
||||
return Err(error::User::InvalidStreamId.into());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -270,7 +305,7 @@ impl<T, P> Sink for Connection<T, P>
|
||||
|
||||
// We already ensured that the upstream can handle the frame, so
|
||||
// panic if it gets rejected.
|
||||
let res = try!(self.inner.start_send(WireFrame::Headers(frame)));
|
||||
let res = try!(self.inner.start_send(Headers(frame)));
|
||||
|
||||
// This is a one-way conversion. By checking `poll_ready` first,
|
||||
// it's already been determined that the inner `Sink` can accept
|
||||
@@ -279,6 +314,25 @@ impl<T, P> Sink for Connection<T, P>
|
||||
|
||||
Ok(AsyncSink::Ready)
|
||||
}
|
||||
Frame::Data { id, data, end_of_stream } => {
|
||||
// The stream must be initialized at this point
|
||||
match self.streams.get_mut(&id) {
|
||||
None => return Err(error::User::InactiveStreamId.into()),
|
||||
Some(state) => try!(state.send_data(end_of_stream)),
|
||||
}
|
||||
|
||||
let mut frame = frame::Data::new(id, data.into_buf());
|
||||
|
||||
if end_of_stream {
|
||||
frame.set_end_stream();
|
||||
}
|
||||
|
||||
let res = try!(self.inner.start_send(frame.into()));
|
||||
|
||||
assert!(res.is_ready());
|
||||
|
||||
Ok(AsyncSink::Ready)
|
||||
}
|
||||
/*
|
||||
Frame::Trailers { id, headers } => {
|
||||
unimplemented!();
|
||||
@@ -302,9 +356,10 @@ impl<T, P> Sink for Connection<T, P>
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, P> ReadySink for Connection<T, P>
|
||||
impl<T, P, B> ReadySink for Connection<T, P, B>
|
||||
where T: AsyncRead + AsyncWrite,
|
||||
P: Peer,
|
||||
B: IntoBuf,
|
||||
{
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::SinkError> {
|
||||
self.inner.poll_ready()
|
||||
|
||||
Reference in New Issue
Block a user