From 38762a9711a3cefdb48afe9d6ae0b0ddce92bca9 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 8 Aug 2017 22:25:05 -0700 Subject: [PATCH] Get server module compiling again --- src/client.rs | 39 ++---------- src/lib.rs | 41 +++++++++++- src/server.rs | 168 ++++++++++++++++++++++++++++++-------------------- 3 files changed, 144 insertions(+), 104 deletions(-) diff --git a/src/client.rs b/src/client.rs index ead699f..c0c58ee 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,5 @@ use {frame, ConnectionError, StreamId}; +use {Body, Chunk}; use proto::{self, Connection}; use error::Reason::*; @@ -25,16 +26,6 @@ pub struct Stream { inner: proto::StreamRef, } -#[derive(Debug)] -pub struct Body { - inner: proto::StreamRef, -} - -#[derive(Debug)] -pub struct Chunk { - inner: proto::Chunk, -} - #[derive(Debug)] pub(crate) struct Peer; @@ -71,8 +62,8 @@ impl Client // Send initial settings frame match framed_write.start_send(settings.into()) { Ok(AsyncSink::Ready) => { - let conn = proto::from_framed_write(framed_write); - Ok(Client { connection: conn }) + let connection = proto::from_framed_write(framed_write); + Ok(Client { connection }) } Ok(_) => unreachable!(), Err(e) => Err(ConnectionError::from(e)), @@ -115,7 +106,7 @@ impl Future for Client impl fmt::Debug for Client where T: fmt::Debug, B: fmt::Debug + IntoBuf, - B::Buf: fmt::Debug + IntoBuf, + B::Buf: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("Client") @@ -180,28 +171,6 @@ impl Future for Stream { } } -// ===== impl Body ===== - -impl futures::Stream for Body { - type Item = Chunk; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, Self::Error> { - let chunk = try_ready!(self.inner.poll_data()) - .map(|inner| Chunk { inner }); - - Ok(chunk.into()) - } -} - -// ===== impl Chunk ===== - -impl Chunk { - pub fn pop_bytes(&mut self) -> Option { - self.inner.pop_bytes() - } -} - // ===== impl Peer ===== impl proto::Peer for Peer { diff --git a/src/lib.rs b/src/lib.rs index 08ac626..479f549 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,7 +32,7 @@ pub mod error; mod hpack; mod proto; mod frame; -// pub mod server; +pub mod server; pub use error::{ConnectionError, Reason}; pub use frame::StreamId; @@ -43,6 +43,45 @@ pub type FrameSize = u32; // TODO: remove if carllerche/http#90 lands pub type HeaderMap = http::HeaderMap; +// TODO: Move into other location + +use bytes::IntoBuf; +use futures::Poll; + +#[derive(Debug)] +pub struct Body { + inner: proto::StreamRef, +} + +#[derive(Debug)] +pub struct Chunk { + inner: proto::Chunk, +} + +// ===== impl Body ===== + +impl futures::Stream for Body { + type Item = Chunk; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, Self::Error> { + let chunk = try_ready!(self.inner.poll_data()) + .map(|inner| Chunk { inner }); + + Ok(chunk.into()) + } +} + +// ===== impl Chunk ===== + +impl Chunk { + pub fn pop_bytes(&mut self) -> Option { + self.inner.pop_bytes() + } +} + +// TODO: Delete below + /// An H2 connection frame #[derive(Debug)] pub enum Frame { diff --git a/src/server.rs b/src/server.rs index 5293a7b..bd74a2d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,9 @@ -use {frame, proto, Peer, ConnectionError, StreamId}; +use {frame, ConnectionError, StreamId}; +use {Body, Chunk}; +use proto::{self, Connection}; +use error::Reason::*; -use http; +use http::{self, Request, Response}; use futures::{Future, Sink, Poll, Async, AsyncSink, IntoFuture}; use tokio_io::{AsyncRead, AsyncWrite}; use bytes::{Bytes, IntoBuf}; @@ -14,14 +17,13 @@ pub struct Handshake { } /// Marker type indicating a client peer -#[derive(Debug)] pub struct Server { connection: Connection, } #[derive(Debug)] pub struct Stream { - inner: proto::StreamRef, + inner: proto::StreamRef, } /// Flush a Sink @@ -35,45 +37,71 @@ struct ReadPreface { pos: usize, } +#[derive(Debug)] +pub(crate) struct Peer; + const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; -pub fn handshake(io: T) -> Handshake +// ===== impl Server ===== + +impl Server where T: AsyncRead + AsyncWrite + 'static, { - handshake2(io) + pub fn handshake(io: T) -> Handshake { + Server::handshake2(io) + } } -/// Bind an H2 server connection. -/// -/// Returns a future which resolves to the connection value once the H2 -/// handshake has been completed. -pub fn handshake2(io: T) -> Handshake +impl Server where T: AsyncRead + AsyncWrite + 'static, - B: 'static, // TODO: Why is this required but not in client? + B: IntoBuf + 'static, { - let mut framed_write = proto::framed_write(io); - let settings = frame::Settings::default(); + /// Bind an H2 server connection. + /// + /// Returns a future which resolves to the connection value once the H2 + /// handshake has been completed. + pub fn handshake2(io: T) -> Handshake { + let mut framed_write = proto::framed_write(io); + let settings = frame::Settings::default(); - // Send initial settings frame - match framed_write.start_send(settings.into()) { - Ok(AsyncSink::Ready) => {} - Ok(_) => unreachable!(), - Err(e) => { - return Handshake { - inner: Box::new(Err(ConnectionError::from(e)).into_future()), + // Send initial settings frame + match framed_write.start_send(settings.into()) { + Ok(AsyncSink::Ready) => {} + Ok(_) => unreachable!(), + Err(e) => { + return Handshake { + inner: Box::new(Err(ConnectionError::from(e)).into_future()), + } } } + + // Flush pending settings frame and then wait for the client preface + let handshake = Flush::new(framed_write) + .and_then(ReadPreface::new) + .map(move |framed_write| { + let connection = proto::from_framed_write(framed_write); + Server { connection } + }) + ; + + Handshake { inner: Box::new(handshake) } } - - // Flush pending settings frame and then wait for the client preface - let handshake = Flush::new(framed_write) - .and_then(ReadPreface::new) - .map(move |framed_write| proto::from_framed_write(framed_write)) - ; - - Handshake { inner: Box::new(handshake) } } +impl fmt::Debug for Server + where T: fmt::Debug, + B: fmt::Debug + IntoBuf, + B::Buf: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Server") + .field("connection", &self.connection) + .finish() + } +} + +// ===== impl Flush ===== + impl Flush { fn new(inner: T) -> Self { Flush { inner: Some(inner) } @@ -123,47 +151,10 @@ impl Future for ReadPreface { } } -impl Peer for Server { - type Send = http::response::Head; - type Poll = http::request::Head; - - fn is_server() -> bool { - true - } - - fn convert_send_message( - id: StreamId, - headers: Self::Send, - end_of_stream: bool) -> frame::Headers - { - use http::response::Head; - - // Extract the components of the HTTP request - let Head { status, headers, .. } = headers; - - // TODO: Ensure that the version is set to H2 - - // Build the set pseudo header set. All requests will include `method` - // and `path`. - let pseudo = frame::Pseudo::response(status); - - // Create the HEADERS frame - let mut frame = frame::Headers::new(id, pseudo, headers); - - if end_of_stream { - frame.set_end_stream() - } - - frame - } - - fn convert_poll_message(headers: frame::Headers) -> Self::Poll { - headers.into_request() - } -} +// ===== impl Handshake ===== impl Future for Handshake { - type Item = Connection; + type Item = Server; type Error = ConnectionError; fn poll(&mut self) -> Poll { @@ -179,3 +170,44 @@ impl fmt::Debug for Handshake write!(fmt, "server::Handshake") } } + +impl proto::Peer for Peer { + type Send = Response<()>; + type Poll = Request<()>; + + fn is_server() -> bool { + true + } + + fn convert_send_message( + id: StreamId, + response: Self::Send, + end_of_stream: bool) -> frame::Headers + { + use http::response::Parts; + + // Extract the components of the HTTP request + let (Parts { status, headers, .. }, _) = response.into_parts(); + + // Build the set pseudo header set. All requests will include `method` + // and `path`. + let pseudo = frame::Pseudo::response(status); + + // Create the HEADERS frame + let mut frame = frame::Headers::new(id, pseudo, headers); + + if end_of_stream { + frame.set_end_stream() + } + + frame + } + + fn convert_poll_message(headers: frame::Headers) + -> Result + { + headers.into_request() + // TODO: Is this always a protocol error? + .map_err(|_| ProtocolError.into()) + } +}