Start working on server
This commit is contained in:
@@ -62,6 +62,8 @@ impl<T, P> Stream for Connection<T, P>
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, ConnectionError> {
|
||||
use frame::Frame::*;
|
||||
|
||||
trace!("Connection::poll");
|
||||
|
||||
let frame = match try!(self.inner.poll()) {
|
||||
Async::Ready(f) => f,
|
||||
Async::NotReady => {
|
||||
@@ -72,6 +74,8 @@ impl<T, P> Stream for Connection<T, P>
|
||||
}
|
||||
};
|
||||
|
||||
trace!("received; frame={:?}", frame);
|
||||
|
||||
let frame = match frame {
|
||||
Some(Headers(v)) => {
|
||||
// TODO: Update stream state
|
||||
|
||||
@@ -28,6 +28,9 @@ type Framed<T> =
|
||||
FramedRead<
|
||||
FramedWrite<T>>;
|
||||
|
||||
/// Create a full H2 transport from an I/O handle.
|
||||
///
|
||||
/// This is called as the final step of the client handshake future.
|
||||
pub fn from_io<T, P>(io: T, settings: frame::SettingSet)
|
||||
-> Connection<T, P>
|
||||
where T: AsyncRead + AsyncWrite,
|
||||
@@ -35,23 +38,49 @@ pub fn from_io<T, P>(io: T, settings: frame::SettingSet)
|
||||
{
|
||||
let framed_write = FramedWrite::new(io);
|
||||
|
||||
// Delimit the frames
|
||||
let framed_read = length_delimited::Builder::new()
|
||||
.big_endian()
|
||||
.length_field_length(3)
|
||||
.length_adjustment(9)
|
||||
.num_skip(0) // Don't skip the header
|
||||
.new_read(framed_write);
|
||||
|
||||
// Map to `Frame` types
|
||||
let framed = FramedRead::new(framed_read);
|
||||
|
||||
// Add ping/pong responder.
|
||||
let ping_pong = PingPong::new(framed);
|
||||
|
||||
// Add settings handler
|
||||
// To avoid code duplication, we're going to go this route. It is a bit
|
||||
// weird, but oh well...
|
||||
let settings = Settings::new(
|
||||
ping_pong, settings);
|
||||
framed_write, settings);
|
||||
|
||||
from_server_handshaker(settings)
|
||||
}
|
||||
|
||||
/// Create a transport prepared to handle the server handshake.
|
||||
///
|
||||
/// When the server is performing the handshake, it is able to only send
|
||||
/// `Settings` frames and is expected to receive the client preface as a byte
|
||||
/// stream. To represent this, `Settings<FramedWrite<T>>` is returned.
|
||||
pub fn server_handshaker<T>(io: T, settings: frame::SettingSet)
|
||||
-> Settings<FramedWrite<T>>
|
||||
where T: AsyncRead + AsyncWrite,
|
||||
{
|
||||
let framed_write = FramedWrite::new(io);
|
||||
|
||||
Settings::new(framed_write, settings)
|
||||
}
|
||||
|
||||
/// Create a full H2 transport from the server handshaker
|
||||
pub fn from_server_handshaker<T, P>(transport: Settings<FramedWrite<T>>)
|
||||
-> Connection<T, P>
|
||||
where T: AsyncRead + AsyncWrite,
|
||||
P: Peer,
|
||||
{
|
||||
let settings = transport.swap_inner(|io| {
|
||||
// Delimit the frames
|
||||
let framed_read = length_delimited::Builder::new()
|
||||
.big_endian()
|
||||
.length_field_length(3)
|
||||
.length_adjustment(9)
|
||||
.num_skip(0) // Don't skip the header
|
||||
.new_read(io);
|
||||
|
||||
// Map to `Frame` types
|
||||
let framed = FramedRead::new(framed_read);
|
||||
|
||||
// Add ping/pong responder.
|
||||
PingPong::new(framed)
|
||||
});
|
||||
|
||||
// Finally, return the constructed `Connection`
|
||||
connection::new(settings)
|
||||
|
||||
@@ -3,6 +3,10 @@ use frame::{self, Frame};
|
||||
use proto::ReadySink;
|
||||
|
||||
use futures::*;
|
||||
use tokio_io::AsyncRead;
|
||||
use bytes::BufMut;
|
||||
|
||||
use std::io;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Settings<T> {
|
||||
@@ -26,8 +30,7 @@ pub struct Settings<T> {
|
||||
}
|
||||
|
||||
impl<T> Settings<T>
|
||||
where T: Stream<Item = Frame, Error = ConnectionError>,
|
||||
T: Sink<SinkItem = Frame, SinkError = ConnectionError>,
|
||||
where T: Sink<SinkItem = Frame, SinkError = ConnectionError>,
|
||||
{
|
||||
pub fn new(inner: T, local: frame::SettingSet) -> Settings<T> {
|
||||
Settings {
|
||||
@@ -40,6 +43,20 @@ impl<T> Settings<T>
|
||||
}
|
||||
}
|
||||
|
||||
/// Swap the inner transport while maintaining the current state.
|
||||
pub fn swap_inner<U, F: FnOnce(T) -> U>(self, f: F) -> Settings<U> {
|
||||
let inner = f(self.inner);
|
||||
|
||||
Settings {
|
||||
inner: inner,
|
||||
local: self.local,
|
||||
remote: self.remote,
|
||||
remaining_acks: self.remaining_acks,
|
||||
is_dirty: self.is_dirty,
|
||||
received_remote: self.received_remote,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send_pending(&mut self) -> Poll<(), ConnectionError> {
|
||||
if self.is_dirty {
|
||||
let frame = frame::Settings::new(self.local.clone());
|
||||
@@ -102,8 +119,7 @@ impl<T> Stream for Settings<T>
|
||||
}
|
||||
|
||||
impl<T> Sink for Settings<T>
|
||||
where T: Stream<Item = Frame, Error = ConnectionError>,
|
||||
T: Sink<SinkItem = Frame, SinkError = ConnectionError>,
|
||||
where T: Sink<SinkItem = Frame, SinkError = ConnectionError>,
|
||||
{
|
||||
type SinkItem = Frame;
|
||||
type SinkError = ConnectionError;
|
||||
@@ -132,8 +148,7 @@ impl<T> Sink for Settings<T>
|
||||
}
|
||||
|
||||
impl<T> ReadySink for Settings<T>
|
||||
where T: Stream<Item = Frame, Error = ConnectionError>,
|
||||
T: Sink<SinkItem = Frame, SinkError = ConnectionError>,
|
||||
where T: Sink<SinkItem = Frame, SinkError = ConnectionError>,
|
||||
T: ReadySink,
|
||||
{
|
||||
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
|
||||
@@ -144,3 +159,21 @@ impl<T> ReadySink for Settings<T>
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: io::Read> io::Read for Settings<T> {
|
||||
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
|
||||
self.inner.read(dst)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead> AsyncRead for Settings<T> {
|
||||
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>
|
||||
where Self: Sized,
|
||||
{
|
||||
self.inner.read_buf(buf)
|
||||
}
|
||||
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
self.inner.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user