diff --git a/examples/client.rs b/examples/client.rs new file mode 100644 index 0000000..fc1c64c --- /dev/null +++ b/examples/client.rs @@ -0,0 +1,53 @@ +extern crate h2; +extern crate http; +extern crate futures; +extern crate tokio_io; +extern crate tokio_core; +extern crate io_dump; +extern crate env_logger; + +use h2::client; + +use http::request; + +use futures::*; + +use tokio_core::reactor; +use tokio_core::net::TcpStream; + +pub fn main() { + let _ = env_logger::init(); + + let mut core = reactor::Core::new().unwrap();; + + let tcp = TcpStream::connect( + &"127.0.0.1:5928".parse().unwrap(), + &core.handle()); + + let tcp = tcp.then(|res| { + let tcp = io_dump::Dump::to_stdout(res.unwrap()); + client::handshake(tcp) + }) + .then(|res| { + let conn = res.unwrap(); + + println!("sending request"); + + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + // request.version = version::H2; + + conn.send_request(1, request, true) + }) + .then(|res| { + let conn = res.unwrap(); + // Get the next message + conn.for_each(|frame| { + println!("RX: {:?}", frame); + Ok(()) + }) + }) + ; + + core.run(tcp).unwrap(); +} diff --git a/examples/server.rs b/examples/server.rs new file mode 100644 index 0000000..f8b7aa3 --- /dev/null +++ b/examples/server.rs @@ -0,0 +1,53 @@ +extern crate h2; +extern crate http; +extern crate futures; +extern crate tokio_io; +extern crate tokio_core; +extern crate io_dump; +extern crate env_logger; + +use h2::server; + +use futures::*; + +use tokio_core::reactor; +use tokio_core::net::TcpListener; + +pub fn main() { + let _ = env_logger::init(); + + let mut core = reactor::Core::new().unwrap();; + let handle = core.handle(); + + let listener = TcpListener::bind( + &"127.0.0.1:5928".parse().unwrap(), + &handle).unwrap(); + + println!("listening on {:?}", listener.local_addr()); + + let server = listener.incoming().for_each(move |(socket, _)| { + let socket = io_dump::Dump::to_stdout(socket); + + let connection = server::handshake(socket) + .then(|res| { + let conn = res.unwrap(); + + println!("H2 connection bound"); + + conn.for_each(|frame| { + println!("RX: {:?}", frame); + Ok(()) + }) + }) + .then(|res| { + let _ = res.unwrap(); + Ok(()) + }) + ; + + handle.spawn(connection); + Ok(()) + }); + + core.run(server).unwrap(); +} diff --git a/src/client.rs b/src/client.rs index 179a0b1..7b5f3cb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -111,6 +111,6 @@ impl Future for Handshake { impl fmt::Debug for Handshake { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "Handshake") + write!(fmt, "client::Handshake") } } diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 6646859..86a051c 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -265,6 +265,16 @@ impl Pseudo { } } + pub fn response(status: StatusCode) -> Self { + Pseudo { + method: None, + scheme: None, + authority: None, + path: None, + status: Some(status), + } + } + pub fn set_scheme(&mut self, scheme: ByteStr) { self.scheme = Some(scheme); } diff --git a/src/lib.rs b/src/lib.rs index 25c6319..acdad1c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,7 @@ pub mod error; pub mod hpack; pub mod proto; pub mod frame; +pub mod server; mod util; diff --git a/src/proto/connection.rs b/src/proto/connection.rs index be453f3..04fb70e 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -62,6 +62,8 @@ impl Stream for Connection fn poll(&mut self) -> Poll, ConnectionError> { use frame::Frame::*; + trace!("Connection::poll"); + let frame = match try!(self.inner.poll()) { Async::Ready(f) => f, Async::NotReady => { @@ -72,6 +74,8 @@ impl Stream for Connection } }; + trace!("received; frame={:?}", frame); + let frame = match frame { Some(Headers(v)) => { // TODO: Update stream state diff --git a/src/proto/mod.rs b/src/proto/mod.rs index b09d802..923a284 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -28,6 +28,9 @@ type Framed = FramedRead< FramedWrite>; +/// Create a full H2 transport from an I/O handle. +/// +/// This is called as the final step of the client handshake future. pub fn from_io(io: T, settings: frame::SettingSet) -> Connection where T: AsyncRead + AsyncWrite, @@ -35,23 +38,49 @@ pub fn from_io(io: T, settings: frame::SettingSet) { let framed_write = FramedWrite::new(io); - // Delimit the frames - let framed_read = length_delimited::Builder::new() - .big_endian() - .length_field_length(3) - .length_adjustment(9) - .num_skip(0) // Don't skip the header - .new_read(framed_write); - - // Map to `Frame` types - let framed = FramedRead::new(framed_read); - - // Add ping/pong responder. - let ping_pong = PingPong::new(framed); - - // Add settings handler + // To avoid code duplication, we're going to go this route. It is a bit + // weird, but oh well... let settings = Settings::new( - ping_pong, settings); + framed_write, settings); + + from_server_handshaker(settings) +} + +/// Create a transport prepared to handle the server handshake. +/// +/// When the server is performing the handshake, it is able to only send +/// `Settings` frames and is expected to receive the client preface as a byte +/// stream. To represent this, `Settings>` is returned. +pub fn server_handshaker(io: T, settings: frame::SettingSet) + -> Settings> + where T: AsyncRead + AsyncWrite, +{ + let framed_write = FramedWrite::new(io); + + Settings::new(framed_write, settings) +} + +/// Create a full H2 transport from the server handshaker +pub fn from_server_handshaker(transport: Settings>) + -> Connection + where T: AsyncRead + AsyncWrite, + P: Peer, +{ + let settings = transport.swap_inner(|io| { + // Delimit the frames + let framed_read = length_delimited::Builder::new() + .big_endian() + .length_field_length(3) + .length_adjustment(9) + .num_skip(0) // Don't skip the header + .new_read(io); + + // Map to `Frame` types + let framed = FramedRead::new(framed_read); + + // Add ping/pong responder. + PingPong::new(framed) + }); // Finally, return the constructed `Connection` connection::new(settings) diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 0b8ad4e..959bec9 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -3,6 +3,10 @@ use frame::{self, Frame}; use proto::ReadySink; use futures::*; +use tokio_io::AsyncRead; +use bytes::BufMut; + +use std::io; #[derive(Debug)] pub struct Settings { @@ -26,8 +30,7 @@ pub struct Settings { } impl Settings - where T: Stream, - T: Sink, + where T: Sink, { pub fn new(inner: T, local: frame::SettingSet) -> Settings { Settings { @@ -40,6 +43,20 @@ impl Settings } } + /// Swap the inner transport while maintaining the current state. + pub fn swap_inner U>(self, f: F) -> Settings { + let inner = f(self.inner); + + Settings { + inner: inner, + local: self.local, + remote: self.remote, + remaining_acks: self.remaining_acks, + is_dirty: self.is_dirty, + received_remote: self.received_remote, + } + } + fn try_send_pending(&mut self) -> Poll<(), ConnectionError> { if self.is_dirty { let frame = frame::Settings::new(self.local.clone()); @@ -102,8 +119,7 @@ impl Stream for Settings } impl Sink for Settings - where T: Stream, - T: Sink, + where T: Sink, { type SinkItem = Frame; type SinkError = ConnectionError; @@ -132,8 +148,7 @@ impl Sink for Settings } impl ReadySink for Settings - where T: Stream, - T: Sink, + where T: Sink, T: ReadySink, { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { @@ -144,3 +159,21 @@ impl ReadySink for Settings Ok(Async::NotReady) } } + +impl io::Read for Settings { + fn read(&mut self, dst: &mut [u8]) -> io::Result { + self.inner.read(dst) + } +} + +impl AsyncRead for Settings { + fn read_buf(&mut self, buf: &mut B) -> Poll + where Self: Sized, + { + self.inner.read_buf(buf) + } + + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..ce2b257 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,159 @@ +use {frame, proto, Peer, ConnectionError, StreamId}; + +use http; +use futures::{Future, Sink, Poll, Async}; +use tokio_io::{AsyncRead, AsyncWrite}; + +use std::fmt; + +/// In progress H2 connection binding +pub struct Handshake { + // TODO: unbox + inner: Box, Error = ConnectionError>>, +} + +/// Marker type indicating a client peer +#[derive(Debug)] +pub struct Server; + +pub type Connection = super::Connection; + +/// Flush a Sink +struct Flush { + inner: Option, +} + +/// Read the client connection preface +struct ReadPreface { + inner: Option, + pos: usize, +} + +const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; + +/// Bind an H2 server connection. +/// +/// Returns a future which resolves to the connection value once the H2 +/// handshake has been completed. +pub fn handshake(io: T) -> Handshake + where T: AsyncRead + AsyncWrite + 'static, +{ + let transport = proto::server_handshaker(io, Default::default()); + + // Flush pending settings frame and then wait for the client preface + let handshake = Flush::new(transport) + .and_then(ReadPreface::new) + .map(proto::from_server_handshaker) + ; + + Handshake { inner: Box::new(handshake) } +} + +impl Flush { + fn new(inner: T) -> Self { + Flush { inner: Some(inner) } + } +} + +impl Future for Flush { + type Item = T; + type Error = T::SinkError; + + fn poll(&mut self) -> Poll { + try_ready!(self.inner.as_mut().unwrap().poll_complete()); + Ok(Async::Ready(self.inner.take().unwrap())) + } +} + +impl ReadPreface { + fn new(inner: T) -> Self { + ReadPreface { + inner: Some(inner), + pos: 0, + } + } +} + +impl Future for ReadPreface { + type Item = T; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll { + let mut buf = [0; 24]; + let rem = PREFACE.len() - self.pos; + + while rem > 0 { + let n = try_nb!(self.inner.as_mut().unwrap().read(&mut buf[..rem])); + + if PREFACE[self.pos..self.pos+n] != buf[..n] { + // TODO: Invalid connection preface + unimplemented!(); + } + + self.pos += n; + } + + Ok(Async::Ready(self.inner.take().unwrap())) + } +} + +impl Peer for Server { + type Send = http::response::Head; + type Poll = http::request::Head; + + fn check_initiating_id(id: StreamId) -> Result<(), ConnectionError> { + if id % 2 == 1 { + // Server stream identifiers must be even + unimplemented!(); + } + + // TODO: Ensure the `id` doesn't overflow u31 + + Ok(()) + } + + fn convert_send_message( + id: StreamId, + headers: Self::Send, + end_of_stream: bool) -> frame::Headers + { + use http::response::Head; + + // Extract the components of the HTTP request + let Head { status, headers, .. } = headers; + + // TODO: Ensure that the version is set to H2 + + // Build the set pseudo header set. All requests will include `method` + // and `path`. + let pseudo = frame::Pseudo::response(status); + + // Create the HEADERS frame + let mut frame = frame::Headers::new(id, pseudo, headers); + + if end_of_stream { + frame.set_end_stream() + } + + frame + } + + fn convert_poll_message(headers: frame::Headers) -> Self::Poll { + headers.into_request() + } +} + +impl Future for Handshake { + type Item = Connection; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll { + self.inner.poll() + } +} + +impl fmt::Debug for Handshake { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "server::Handshake") + } +}