Shuffle types around

This commit is contained in:
Carl Lerche
2017-06-27 11:04:36 -07:00
parent 1f85d54cff
commit 79aa11ad32
6 changed files with 91 additions and 56 deletions

View File

@@ -22,7 +22,7 @@ pub type Connection<T> = super::Connection<T, Client>;
///
/// Returns a future which resolves to the connection value once the H2
/// handshake has been completed.
pub fn bind<T>(io: T) -> Handshake<T>
pub fn handshake<T>(io: T) -> Handshake<T>
where T: AsyncRead + AsyncWrite + 'static,
{
use tokio_io::io;
@@ -30,12 +30,11 @@ pub fn bind<T>(io: T) -> Handshake<T>
debug!("binding client connection");
let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")
.then(|res| {
let (io, _) = res.unwrap();
.map(|(io, _)| {
debug!("client connection bound");
// Use default local settings for now
proto::Handshake::new(io, Default::default())
proto::from_io(io, Default::default())
})
.map_err(ConnectionError::from);

View File

@@ -22,29 +22,16 @@ pub struct Connection<T, P> {
peer: PhantomData<P>,
}
impl<T, P> From<proto::Inner<T>> for Connection<T, P>
where T: AsyncRead + AsyncWrite,
P: Peer,
{
fn from(src: proto::Inner<T>) -> Self {
Connection {
inner: src,
streams: StreamMap::default(),
peer: PhantomData,
}
}
}
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
impl<T, P> Connection<T, P>
pub fn new<T, P>(transport: proto::Inner<T>) -> Connection<T, P>
where T: AsyncRead + AsyncWrite,
P: Peer,
{
/// Completes when the connection has terminated
pub fn poll_shutdown(&mut self) -> Poll<(), ConnectionError> {
try_ready!(self.poll_complete());
Ok(Async::NotReady)
Connection {
inner: transport,
streams: StreamMap::default(),
peer: PhantomData,
}
}

View File

@@ -1,17 +1,20 @@
use {hpack, ConnectionError};
use frame::{self, Frame, Kind};
use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE;
use tokio_io::AsyncWrite;
use proto::ReadySink;
use futures::*;
use bytes::{Bytes, BytesMut, Buf};
use tokio_io::{AsyncRead};
use tokio_io::codec::length_delimited;
use std::io::{self, Write, Cursor};
#[derive(Debug)]
pub struct FramedRead<T> {
inner: T,
inner: length_delimited::FramedRead<T>,
// hpack decoder state
hpack: hpack::Decoder,
@@ -27,10 +30,10 @@ enum Partial {
}
impl<T> FramedRead<T>
where T: Stream<Item = BytesMut, Error = io::Error>,
T: AsyncWrite,
where T: AsyncRead,
T: Sink<SinkItem = Frame, SinkError = ConnectionError>,
{
pub fn new(inner: T) -> FramedRead<T> {
pub fn new(inner: length_delimited::FramedRead<T>) -> FramedRead<T> {
FramedRead {
inner: inner,
hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
@@ -103,7 +106,7 @@ impl<T> FramedRead<T> {
}
impl<T> Stream for FramedRead<T>
where T: Stream<Item = BytesMut, Error = io::Error>,
where T: AsyncRead,
{
type Item = Frame;
type Error = ConnectionError;
@@ -128,30 +131,26 @@ impl<T: Sink> Sink for FramedRead<T> {
type SinkError = T::SinkError;
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
self.inner.start_send(item)
self.inner.get_mut().start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_complete()
self.inner.get_mut().poll_complete()
}
}
impl<T: ReadySink> ReadySink for FramedRead<T> {
fn poll_ready(&mut self) -> Poll<(), Self::SinkError> {
self.inner.get_mut().poll_ready()
}
}
impl<T: io::Write> io::Write for FramedRead<T> {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
self.inner.write(src)
self.inner.get_mut().write(src)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl<T: AsyncWrite> AsyncWrite for FramedRead<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.inner.shutdown()
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.write_buf(buf)
self.inner.get_mut().flush()
}
}

View File

@@ -3,7 +3,7 @@ use frame::{self, Frame, Error};
use proto::ReadySink;
use futures::*;
use tokio_io::AsyncWrite;
use tokio_io::{AsyncRead, AsyncWrite};
use bytes::{Bytes, BytesMut, Buf, BufMut};
use http::header::{self, HeaderValue};
@@ -176,3 +176,21 @@ impl<T: Stream> Stream for FramedWrite<T> {
self.inner.poll()
}
}
impl<T: io::Read> io::Read for FramedWrite<T> {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
self.inner.read(dst)
}
}
impl<T: AsyncRead> AsyncRead for FramedWrite<T> {
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>
where Self: Sized,
{
self.inner.read_buf(buf)
}
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
}

View File

@@ -1,7 +1,6 @@
mod connection;
mod framed_read;
mod framed_write;
mod handshake;
mod ping_pong;
mod ready;
mod settings;
@@ -10,21 +9,50 @@ mod state;
pub use self::connection::{Connection};
pub use self::framed_read::FramedRead;
pub use self::framed_write::FramedWrite;
pub use self::handshake::Handshake;
pub use self::ping_pong::PingPong;
pub use self::ready::ReadySink;
pub use self::settings::Settings;
pub use self::state::State;
use tokio_io::codec::length_delimited;
use {frame, Peer};
/// Base HTTP/2.0 transport. Only handles framing.
type Framed<T> =
FramedWrite<
FramedRead<
length_delimited::FramedRead<T>>>;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
type Inner<T> =
Settings<
PingPong<
Framed<T>>>;
type Framed<T> =
FramedRead<
FramedWrite<T>>;
pub fn from_io<T, P>(io: T, settings: frame::SettingSet)
-> Connection<T, P>
where T: AsyncRead + AsyncWrite,
P: Peer,
{
let framed_write = FramedWrite::new(io);
// Delimit the frames
let framed_read = length_delimited::Builder::new()
.big_endian()
.length_field_length(3)
.length_adjustment(9)
.num_skip(0) // Don't skip the header
.new_read(framed_write);
// Map to `Frame` types
let framed = FramedRead::new(framed_read);
// Add ping/pong handler
let ping_pong = PingPong::new(framed);
// Add settings handler
let settings = Settings::new(
ping_pong, settings);
// Finally, return the constructed `Connection`
connection::new(settings)
}

View File

@@ -20,19 +20,23 @@ pub struct Settings<T> {
// True when the local settings must be flushed to the remote
is_dirty: bool,
// True when we have received a settings frame from the remote.
received_remote: bool,
}
impl<T> Settings<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame, SinkError = ConnectionError>,
{
pub fn new(inner: T, local: frame::SettingSet, remote: frame::SettingSet) -> Settings<T> {
pub fn new(inner: T, local: frame::SettingSet) -> Settings<T> {
Settings {
inner: inner,
local: local,
remote: remote,
remaining_acks: 1,
is_dirty: false,
remote: frame::SettingSet::default(),
remaining_acks: 0,
is_dirty: true,
received_remote: false,
}
}