add Server Builder

This commit is contained in:
Sean McArthur
2017-09-12 11:38:23 -07:00
parent e4b8dde1d3
commit 7531e5276b

View File

@@ -1,6 +1,5 @@
use codec::{Codec, RecvError}; use codec::{Codec, RecvError};
use frame::{self, Reason, Settings, StreamId}; use frame::{self, Reason, Settings, StreamId};
use frame::Reason::*;
use proto::{self, Connection, WindowSize}; use proto::{self, Connection, WindowSize};
use bytes::{Buf, Bytes, IntoBuf}; use bytes::{Buf, Bytes, IntoBuf};
@@ -21,6 +20,12 @@ pub struct Server<T, B: IntoBuf> {
connection: Connection<T, Peer, B>, connection: Connection<T, Peer, B>,
} }
/// Build a Server
#[derive(Debug, Default)]
pub struct Builder {
settings: Settings,
}
#[derive(Debug)] #[derive(Debug)]
pub struct Stream<B: IntoBuf> { pub struct Stream<B: IntoBuf> {
inner: proto::StreamRef<B::Buf, Peer>, inner: proto::StreamRef<B::Buf, Peer>,
@@ -63,8 +68,19 @@ impl<T> Server<T, Bytes>
where where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
{ {
/// Bind an H2 server connection.
///
/// Returns a future which resolves to the connection value once the H2
/// handshake has been completed.
pub fn handshake(io: T) -> Handshake<T, Bytes> { pub fn handshake(io: T) -> Handshake<T, Bytes> {
Server::handshake2(io) Server::builder().handshake(io)
}
}
impl Server<(), Bytes> {
/// Create a Server Builder
pub fn builder() -> Builder {
Builder::default()
} }
} }
@@ -73,27 +89,20 @@ where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
B: IntoBuf + 'static, B: IntoBuf + 'static,
{ {
/// Bind an H2 server connection. fn handshake2(io: T, settings: Settings) -> Handshake<T, B> {
///
/// Returns a future which resolves to the connection value once the H2
/// handshake has been completed.
pub fn handshake2(io: T) -> Handshake<T, B> {
// Create the codec // Create the codec
let mut codec = Codec::new(io); let mut codec = Codec::new(io);
// Create the initial SETTINGS frame
let settings = Settings::default();
// Send initial settings frame // Send initial settings frame
codec codec
.buffer(settings.into()) .buffer(settings.clone().into())
.expect("invalid SETTINGS frame"); .expect("invalid SETTINGS frame");
// Flush pending settings frame and then wait for the client preface // Flush pending settings frame and then wait for the client preface
let handshake = Flush::new(codec) let handshake = Flush::new(codec)
.and_then(ReadPreface::new) .and_then(ReadPreface::new)
.map(move |codec| { .map(move |codec| {
let connection = Connection::new(codec, &Settings::default()); let connection = Connection::new(codec, &settings);
Server { Server {
connection, connection,
} }
@@ -162,6 +171,27 @@ where
} }
} }
// ===== impl Builder =====
impl Builder {
/// Set the initial window size of the remote peer.
pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
self.settings.set_initial_window_size(Some(size));
self
}
/// Bind an H2 server connection.
///
/// Returns a future which resolves to the connection value once the H2
/// handshake has been completed.
pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
where T: AsyncRead + AsyncWrite + 'static,
B: IntoBuf + 'static
{
Server::handshake2(io, self.settings.clone())
}
}
// ===== impl Stream ===== // ===== impl Stream =====
impl<B: IntoBuf> Stream<B> { impl<B: IntoBuf> Stream<B> {
@@ -367,7 +397,7 @@ where
if PREFACE[self.pos..self.pos + n] != buf[..n] { if PREFACE[self.pos..self.pos + n] != buf[..n] {
// TODO: Should this just write the GO_AWAY frame directly? // TODO: Should this just write the GO_AWAY frame directly?
return Err(ProtocolError.into()); return Err(Reason::ProtocolError.into());
} }
self.pos += n; self.pos += n;
@@ -450,7 +480,7 @@ impl proto::Peer for Peer {
() => { () => {
return Err(RecvError::Stream { return Err(RecvError::Stream {
id: stream_id, id: stream_id,
reason: ProtocolError, reason: Reason::ProtocolError,
}); });
} }
}; };
@@ -465,7 +495,7 @@ impl proto::Peer for Peer {
// Specifying :status for a request is a protocol error // Specifying :status for a request is a protocol error
if pseudo.status.is_some() { if pseudo.status.is_some() {
return Err(RecvError::Connection(ProtocolError)); return Err(RecvError::Connection(Reason::ProtocolError));
} }
// Convert the URI // Convert the URI
@@ -502,7 +532,7 @@ impl proto::Peer for Peer {
// kinds of errors // kinds of errors
return Err(RecvError::Stream { return Err(RecvError::Stream {
id: stream_id, id: stream_id,
reason: ProtocolError, reason: Reason::ProtocolError,
}); });
}, },
}; };