Get server module compiling again

This commit is contained in:
Carl Lerche
2017-08-08 22:25:05 -07:00
parent 26df3a3698
commit 38762a9711
3 changed files with 144 additions and 104 deletions

View File

@@ -1,4 +1,5 @@
use {frame, ConnectionError, StreamId}; use {frame, ConnectionError, StreamId};
use {Body, Chunk};
use proto::{self, Connection}; use proto::{self, Connection};
use error::Reason::*; use error::Reason::*;
@@ -25,16 +26,6 @@ pub struct Stream<B: IntoBuf> {
inner: proto::StreamRef<B::Buf>, inner: proto::StreamRef<B::Buf>,
} }
#[derive(Debug)]
pub struct Body<B: IntoBuf> {
inner: proto::StreamRef<B::Buf>,
}
#[derive(Debug)]
pub struct Chunk<B: IntoBuf> {
inner: proto::Chunk<B::Buf>,
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Peer; pub(crate) struct Peer;
@@ -71,8 +62,8 @@ impl<T, B> Client<T, B>
// Send initial settings frame // Send initial settings frame
match framed_write.start_send(settings.into()) { match framed_write.start_send(settings.into()) {
Ok(AsyncSink::Ready) => { Ok(AsyncSink::Ready) => {
let conn = proto::from_framed_write(framed_write); let connection = proto::from_framed_write(framed_write);
Ok(Client { connection: conn }) Ok(Client { connection })
} }
Ok(_) => unreachable!(), Ok(_) => unreachable!(),
Err(e) => Err(ConnectionError::from(e)), Err(e) => Err(ConnectionError::from(e)),
@@ -115,7 +106,7 @@ impl<T, B> Future for Client<T, B>
impl<T, B> fmt::Debug for Client<T, B> impl<T, B> fmt::Debug for Client<T, B>
where T: fmt::Debug, where T: fmt::Debug,
B: fmt::Debug + IntoBuf, B: fmt::Debug + IntoBuf,
B::Buf: fmt::Debug + IntoBuf, B::Buf: fmt::Debug,
{ {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Client") fmt.debug_struct("Client")
@@ -180,28 +171,6 @@ impl<B: IntoBuf> Future for Stream<B> {
} }
} }
// ===== impl Body =====
impl<B: IntoBuf> futures::Stream for Body<B> {
type Item = Chunk<B>;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let chunk = try_ready!(self.inner.poll_data())
.map(|inner| Chunk { inner });
Ok(chunk.into())
}
}
// ===== impl Chunk =====
impl<B: IntoBuf> Chunk<B> {
pub fn pop_bytes(&mut self) -> Option<Bytes> {
self.inner.pop_bytes()
}
}
// ===== impl Peer ===== // ===== impl Peer =====
impl proto::Peer for Peer { impl proto::Peer for Peer {

View File

@@ -32,7 +32,7 @@ pub mod error;
mod hpack; mod hpack;
mod proto; mod proto;
mod frame; mod frame;
// pub mod server; pub mod server;
pub use error::{ConnectionError, Reason}; pub use error::{ConnectionError, Reason};
pub use frame::StreamId; pub use frame::StreamId;
@@ -43,6 +43,45 @@ pub type FrameSize = u32;
// TODO: remove if carllerche/http#90 lands // TODO: remove if carllerche/http#90 lands
pub type HeaderMap = http::HeaderMap<http::header::HeaderValue>; pub type HeaderMap = http::HeaderMap<http::header::HeaderValue>;
// TODO: Move into other location
use bytes::IntoBuf;
use futures::Poll;
#[derive(Debug)]
pub struct Body<B: IntoBuf> {
inner: proto::StreamRef<B::Buf>,
}
#[derive(Debug)]
pub struct Chunk<B: IntoBuf> {
inner: proto::Chunk<B::Buf>,
}
// ===== impl Body =====
impl<B: IntoBuf> futures::Stream for Body<B> {
type Item = Chunk<B>;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let chunk = try_ready!(self.inner.poll_data())
.map(|inner| Chunk { inner });
Ok(chunk.into())
}
}
// ===== impl Chunk =====
impl<B: IntoBuf> Chunk<B> {
pub fn pop_bytes(&mut self) -> Option<Bytes> {
self.inner.pop_bytes()
}
}
// TODO: Delete below
/// An H2 connection frame /// An H2 connection frame
#[derive(Debug)] #[derive(Debug)]
pub enum Frame<T, B = Bytes> { pub enum Frame<T, B = Bytes> {

View File

@@ -1,6 +1,9 @@
use {frame, proto, Peer, ConnectionError, StreamId}; use {frame, ConnectionError, StreamId};
use {Body, Chunk};
use proto::{self, Connection};
use error::Reason::*;
use http; use http::{self, Request, Response};
use futures::{Future, Sink, Poll, Async, AsyncSink, IntoFuture}; use futures::{Future, Sink, Poll, Async, AsyncSink, IntoFuture};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use bytes::{Bytes, IntoBuf}; use bytes::{Bytes, IntoBuf};
@@ -14,14 +17,13 @@ pub struct Handshake<T, B: IntoBuf = Bytes> {
} }
/// Marker type indicating a client peer /// Marker type indicating a client peer
#[derive(Debug)]
pub struct Server<T, B: IntoBuf> { pub struct Server<T, B: IntoBuf> {
connection: Connection<T, Peer, B>, connection: Connection<T, Peer, B>,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct Stream<B: IntoBuf> { pub struct Stream<B: IntoBuf> {
inner: proto::StreamRef<Peer, B::Buf>, inner: proto::StreamRef<B::Buf>,
} }
/// Flush a Sink /// Flush a Sink
@@ -35,45 +37,71 @@ struct ReadPreface<T> {
pos: usize, pos: usize,
} }
#[derive(Debug)]
pub(crate) struct Peer;
const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
pub fn handshake<T>(io: T) -> Handshake<T, Bytes> // ===== impl Server =====
impl<T> Server<T, Bytes>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
{ {
handshake2(io) pub fn handshake(io: T) -> Handshake<T, Bytes> {
Server::handshake2(io)
}
} }
/// Bind an H2 server connection. impl<T, B> Server<T, B>
///
/// Returns a future which resolves to the connection value once the H2
/// handshake has been completed.
pub fn handshake2<T, B: IntoBuf>(io: T) -> Handshake<T, B>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
B: 'static, // TODO: Why is this required but not in client? B: IntoBuf + 'static,
{ {
let mut framed_write = proto::framed_write(io); /// Bind an H2 server connection.
let settings = frame::Settings::default(); ///
/// Returns a future which resolves to the connection value once the H2
/// handshake has been completed.
pub fn handshake2(io: T) -> Handshake<T, B> {
let mut framed_write = proto::framed_write(io);
let settings = frame::Settings::default();
// Send initial settings frame // Send initial settings frame
match framed_write.start_send(settings.into()) { match framed_write.start_send(settings.into()) {
Ok(AsyncSink::Ready) => {} Ok(AsyncSink::Ready) => {}
Ok(_) => unreachable!(), Ok(_) => unreachable!(),
Err(e) => { Err(e) => {
return Handshake { return Handshake {
inner: Box::new(Err(ConnectionError::from(e)).into_future()), inner: Box::new(Err(ConnectionError::from(e)).into_future()),
}
} }
} }
// Flush pending settings frame and then wait for the client preface
let handshake = Flush::new(framed_write)
.and_then(ReadPreface::new)
.map(move |framed_write| {
let connection = proto::from_framed_write(framed_write);
Server { connection }
})
;
Handshake { inner: Box::new(handshake) }
} }
// Flush pending settings frame and then wait for the client preface
let handshake = Flush::new(framed_write)
.and_then(ReadPreface::new)
.map(move |framed_write| proto::from_framed_write(framed_write))
;
Handshake { inner: Box::new(handshake) }
} }
impl<T, B> fmt::Debug for Server<T, B>
where T: fmt::Debug,
B: fmt::Debug + IntoBuf,
B::Buf: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Server")
.field("connection", &self.connection)
.finish()
}
}
// ===== impl Flush =====
impl<T> Flush<T> { impl<T> Flush<T> {
fn new(inner: T) -> Self { fn new(inner: T) -> Self {
Flush { inner: Some(inner) } Flush { inner: Some(inner) }
@@ -123,47 +151,10 @@ impl<T: AsyncRead> Future for ReadPreface<T> {
} }
} }
impl Peer for Server { // ===== impl Handshake =====
type Send = http::response::Head;
type Poll = http::request::Head;
fn is_server() -> bool {
true
}
fn convert_send_message(
id: StreamId,
headers: Self::Send,
end_of_stream: bool) -> frame::Headers
{
use http::response::Head;
// Extract the components of the HTTP request
let Head { status, headers, .. } = headers;
// TODO: Ensure that the version is set to H2
// Build the set pseudo header set. All requests will include `method`
// and `path`.
let pseudo = frame::Pseudo::response(status);
// Create the HEADERS frame
let mut frame = frame::Headers::new(id, pseudo, headers);
if end_of_stream {
frame.set_end_stream()
}
frame
}
fn convert_poll_message(headers: frame::Headers) -> Self::Poll {
headers.into_request()
}
}
impl<T, B: IntoBuf> Future for Handshake<T, B> { impl<T, B: IntoBuf> Future for Handshake<T, B> {
type Item = Connection<T, B>; type Item = Server<T, B>;
type Error = ConnectionError; type Error = ConnectionError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@@ -179,3 +170,44 @@ impl<T, B> fmt::Debug for Handshake<T, B>
write!(fmt, "server::Handshake") write!(fmt, "server::Handshake")
} }
} }
impl proto::Peer for Peer {
type Send = Response<()>;
type Poll = Request<()>;
fn is_server() -> bool {
true
}
fn convert_send_message(
id: StreamId,
response: Self::Send,
end_of_stream: bool) -> frame::Headers
{
use http::response::Parts;
// Extract the components of the HTTP request
let (Parts { status, headers, .. }, _) = response.into_parts();
// Build the set pseudo header set. All requests will include `method`
// and `path`.
let pseudo = frame::Pseudo::response(status);
// Create the HEADERS frame
let mut frame = frame::Headers::new(id, pseudo, headers);
if end_of_stream {
frame.set_end_stream()
}
frame
}
fn convert_poll_message(headers: frame::Headers)
-> Result<Self::Poll, ConnectionError>
{
headers.into_request()
// TODO: Is this always a protocol error?
.map_err(|_| ProtocolError.into())
}
}