diff --git a/src/client.rs b/src/client.rs index 54c6bd2..85546f1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -22,7 +22,7 @@ pub type Connection = super::Connection; /// /// Returns a future which resolves to the connection value once the H2 /// handshake has been completed. -pub fn bind(io: T) -> Handshake +pub fn handshake(io: T) -> Handshake where T: AsyncRead + AsyncWrite + 'static, { use tokio_io::io; @@ -30,12 +30,11 @@ pub fn bind(io: T) -> Handshake debug!("binding client connection"); let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") - .then(|res| { - let (io, _) = res.unwrap(); + .map(|(io, _)| { debug!("client connection bound"); // Use default local settings for now - proto::Handshake::new(io, Default::default()) + proto::from_io(io, Default::default()) }) .map_err(ConnectionError::from); diff --git a/src/proto/connection.rs b/src/proto/connection.rs index b5995b3..90f41e8 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -22,29 +22,16 @@ pub struct Connection { peer: PhantomData

, } -impl From> for Connection - where T: AsyncRead + AsyncWrite, - P: Peer, -{ - fn from(src: proto::Inner) -> Self { - Connection { - inner: src, - streams: StreamMap::default(), - peer: PhantomData, - } - } -} - type StreamMap = OrderMap>; -impl Connection +pub fn new(transport: proto::Inner) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, { - /// Completes when the connection has terminated - pub fn poll_shutdown(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.poll_complete()); - Ok(Async::NotReady) + Connection { + inner: transport, + streams: StreamMap::default(), + peer: PhantomData, } } diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 3547f3f..80bd254 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -1,17 +1,20 @@ use {hpack, ConnectionError}; use frame::{self, Frame, Kind}; use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE; - -use tokio_io::AsyncWrite; +use proto::ReadySink; use futures::*; + use bytes::{Bytes, BytesMut, Buf}; +use tokio_io::{AsyncRead}; +use tokio_io::codec::length_delimited; + use std::io::{self, Write, Cursor}; #[derive(Debug)] pub struct FramedRead { - inner: T, + inner: length_delimited::FramedRead, // hpack decoder state hpack: hpack::Decoder, @@ -27,10 +30,10 @@ enum Partial { } impl FramedRead - where T: Stream, - T: AsyncWrite, + where T: AsyncRead, + T: Sink, { - pub fn new(inner: T) -> FramedRead { + pub fn new(inner: length_delimited::FramedRead) -> FramedRead { FramedRead { inner: inner, hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE), @@ -103,7 +106,7 @@ impl FramedRead { } impl Stream for FramedRead - where T: Stream, + where T: AsyncRead, { type Item = Frame; type Error = ConnectionError; @@ -128,30 +131,26 @@ impl Sink for FramedRead { type SinkError = T::SinkError; fn start_send(&mut self, item: T::SinkItem) -> StartSend { - self.inner.start_send(item) + self.inner.get_mut().start_send(item) } fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - self.inner.poll_complete() + self.inner.get_mut().poll_complete() + } +} + +impl ReadySink for FramedRead { + fn poll_ready(&mut self) -> Poll<(), Self::SinkError> { + self.inner.get_mut().poll_ready() } } impl io::Write for FramedRead { fn write(&mut self, src: &[u8]) -> io::Result { - self.inner.write(src) + self.inner.get_mut().write(src) } fn flush(&mut self) -> io::Result<()> { - self.inner.flush() - } -} - -impl AsyncWrite for FramedRead { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.inner.shutdown() - } - - fn write_buf(&mut self, buf: &mut B) -> Poll { - self.inner.write_buf(buf) + self.inner.get_mut().flush() } } diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index ca6dcbe..eb48526 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -3,7 +3,7 @@ use frame::{self, Frame, Error}; use proto::ReadySink; use futures::*; -use tokio_io::AsyncWrite; +use tokio_io::{AsyncRead, AsyncWrite}; use bytes::{Bytes, BytesMut, Buf, BufMut}; use http::header::{self, HeaderValue}; @@ -176,3 +176,21 @@ impl Stream for FramedWrite { self.inner.poll() } } + +impl io::Read for FramedWrite { + fn read(&mut self, dst: &mut [u8]) -> io::Result { + self.inner.read(dst) + } +} + +impl AsyncRead for FramedWrite { + fn read_buf(&mut self, buf: &mut B) -> Poll + where Self: Sized, + { + self.inner.read_buf(buf) + } + + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 91f51c9..0443c50 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,7 +1,6 @@ mod connection; mod framed_read; mod framed_write; -mod handshake; mod ping_pong; mod ready; mod settings; @@ -10,21 +9,50 @@ mod state; pub use self::connection::{Connection}; pub use self::framed_read::FramedRead; pub use self::framed_write::FramedWrite; -pub use self::handshake::Handshake; pub use self::ping_pong::PingPong; pub use self::ready::ReadySink; pub use self::settings::Settings; pub use self::state::State; -use tokio_io::codec::length_delimited; +use {frame, Peer}; -/// Base HTTP/2.0 transport. Only handles framing. -type Framed = - FramedWrite< - FramedRead< - length_delimited::FramedRead>>; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::length_delimited; type Inner = Settings< PingPong< Framed>>; + +type Framed = + FramedRead< + FramedWrite>; + +pub fn from_io(io: T, settings: frame::SettingSet) + -> Connection + where T: AsyncRead + AsyncWrite, + P: Peer, +{ + let framed_write = FramedWrite::new(io); + + // Delimit the frames + let framed_read = length_delimited::Builder::new() + .big_endian() + .length_field_length(3) + .length_adjustment(9) + .num_skip(0) // Don't skip the header + .new_read(framed_write); + + // Map to `Frame` types + let framed = FramedRead::new(framed_read); + + // Add ping/pong handler + let ping_pong = PingPong::new(framed); + + // Add settings handler + let settings = Settings::new( + ping_pong, settings); + + // Finally, return the constructed `Connection` + connection::new(settings) +} diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 17de84f..ae7fc06 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -20,19 +20,23 @@ pub struct Settings { // True when the local settings must be flushed to the remote is_dirty: bool, + + // True when we have received a settings frame from the remote. + received_remote: bool, } impl Settings where T: Stream, T: Sink, { - pub fn new(inner: T, local: frame::SettingSet, remote: frame::SettingSet) -> Settings { + pub fn new(inner: T, local: frame::SettingSet) -> Settings { Settings { inner: inner, local: local, - remote: remote, - remaining_acks: 1, - is_dirty: false, + remote: frame::SettingSet::default(), + remaining_acks: 0, + is_dirty: true, + received_remote: false, } }