Get server working again (mostly)

This commit is contained in:
Carl Lerche
2017-08-09 10:36:03 -07:00
parent 38762a9711
commit 8f2b69c280
7 changed files with 176 additions and 61 deletions

View File

@@ -1,4 +1,3 @@
/*
extern crate h2; extern crate h2;
extern crate http; extern crate http;
extern crate futures; extern crate futures;
@@ -7,10 +6,9 @@ extern crate tokio_core;
extern crate io_dump; extern crate io_dump;
extern crate env_logger; extern crate env_logger;
use h2::client; use h2::client::Client;
use http::*; use http::*;
use futures::*; use futures::*;
use tokio_core::reactor; use tokio_core::reactor;
@@ -27,39 +25,24 @@ pub fn main() {
let tcp = tcp.then(|res| { let tcp = tcp.then(|res| {
let tcp = io_dump::Dump::to_stdout(res.unwrap()); let tcp = io_dump::Dump::to_stdout(res.unwrap());
client::handshake(tcp) Client::handshake(tcp)
}) })
.then(|res| { .then(|res| {
let conn = res.unwrap(); let mut client = res.unwrap();
println!("sending request"); println!("sending request");
let mut request = request::Head::default(); let request = Request::builder()
request.method = method::POST; .uri("https://http2.akamai.com/")
request.uri = "https://http2.akamai.com/".parse().unwrap(); .body(()).unwrap();
// request.version = version::H2;
conn.send_request(1.into(), request, true) let stream = client.request(request, true);
}) client.join(stream.and_then(|response| {
/* println!("GOT RESPONSE: {:?}", response);
.then(|res| {
let conn = res.unwrap();
conn.send_data(1.into(), "hello".into(), true)
})
*/
.then(|res| {
let conn = res.unwrap();
// Get the next message
conn.for_each(|frame| {
println!("RX: {:?}", frame);
Ok(()) Ok(())
}) }))
}) })
; ;
core.run(tcp).unwrap(); core.run(tcp).unwrap();
} }
*/
pub fn main() {}

View File

@@ -1,4 +1,3 @@
/*
extern crate h2; extern crate h2;
extern crate http; extern crate http;
extern crate futures; extern crate futures;
@@ -7,10 +6,9 @@ extern crate tokio_core;
extern crate io_dump; extern crate io_dump;
extern crate env_logger; extern crate env_logger;
use h2::server; use h2::server::Server;
use http::{response, status};
use http::*;
use futures::*; use futures::*;
use tokio_core::reactor; use tokio_core::reactor;
@@ -31,12 +29,18 @@ pub fn main() {
let server = listener.incoming().for_each(move |(socket, _)| { let server = listener.incoming().for_each(move |(socket, _)| {
let socket = io_dump::Dump::to_stdout(socket); let socket = io_dump::Dump::to_stdout(socket);
let connection = server::handshake(socket) let connection = Server::handshake(socket)
.then(|res| { .then(|res| {
let conn = res.unwrap(); let conn = res.unwrap();
println!("H2 connection bound"); println!("H2 connection bound");
conn.for_each(|(request, stream)| {
println!("GOT request: {:?}", request);
Ok(())
})
/*
// Receive a request // Receive a request
conn.into_future() conn.into_future()
.then(|res| { .then(|res| {
@@ -60,6 +64,7 @@ pub fn main() {
conn.send_data(1.into(), "world".into(), true) conn.send_data(1.into(), "world".into(), true)
}) })
*/
}) })
.then(|res| { .then(|res| {
let _ = res.unwrap(); let _ = res.unwrap();
@@ -73,6 +78,3 @@ pub fn main() {
core.run(server).unwrap(); core.run(server).unwrap();
} }
*/
pub fn main() {}

View File

@@ -1,4 +1,4 @@
use {client, ConnectionError, Frame}; use {client, server, ConnectionError, Frame};
use HeaderMap; use HeaderMap;
use frame::{self, StreamId}; use frame::{self, StreamId};
@@ -122,19 +122,7 @@ impl<T, P, B> Connection<T, P, B>
match frame { match frame {
Some(Headers(frame)) => { Some(Headers(frame)) => {
trace!("recv HEADERS; frame={:?}", frame); trace!("recv HEADERS; frame={:?}", frame);
try!(self.streams.recv_headers::<P>(frame));
if let Some(frame) = try!(self.streams.recv_headers::<P>(frame)) {
unimplemented!();
}
/*
// Update stream state while ensuring that the headers frame
// can be received.
if let Some(frame) = try!(self.streams.recv_headers(frame)) {
let frame = Self::convert_poll_message(frame)?;
return Ok(Some(frame).into());
}
*/
} }
Some(Data(frame)) => { Some(Data(frame)) => {
trace!("recv DATA; frame={:?}", frame); trace!("recv DATA; frame={:?}", frame);
@@ -269,6 +257,15 @@ impl<T, B> Connection<T, client::Peer, B>
} }
} }
impl<T, B> Connection<T, server::Peer, B>
where T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
pub fn next_incoming(&mut self) -> Option<StreamRef<B::Buf>> {
self.streams.next_incoming()
}
}
/* /*
impl<T, B> Connection<T, Client, B> impl<T, B> Connection<T, Client, B>
where T: AsyncRead + AsyncWrite, where T: AsyncRead + AsyncWrite,

View File

@@ -1,4 +1,4 @@
use {client, frame, ConnectionError}; use {client, server, frame, ConnectionError};
use proto::*; use proto::*;
use super::*; use super::*;
@@ -25,6 +25,9 @@ pub(super) struct Recv<B> {
/// TODO: don't use a VecDeque /// TODO: don't use a VecDeque
pending_window_updates: VecDeque<StreamId>, pending_window_updates: VecDeque<StreamId>,
/// New streams to be accepted
pending_accept: store::List<B>,
/// Holds frames that are waiting to be read /// Holds frames that are waiting to be read
buffer: Buffer<Bytes>, buffer: Buffer<Bytes>,
@@ -54,6 +57,7 @@ impl<B> Recv<B> where B: Buf {
init_window_sz: config.init_remote_window_sz, init_window_sz: config.init_remote_window_sz,
flow_control: FlowControl::new(config.init_remote_window_sz), flow_control: FlowControl::new(config.init_remote_window_sz),
pending_window_updates: VecDeque::new(), pending_window_updates: VecDeque::new(),
pending_accept: store::List::new(),
buffer: Buffer::new(), buffer: Buffer::new(),
refused: None, refused: None,
_p: PhantomData, _p: PhantomData,
@@ -78,6 +82,19 @@ impl<B> Recv<B> where B: Buf {
Ok(Some(Stream::new(id))) Ok(Some(Stream::new(id)))
} }
pub fn take_request(&mut self, stream: &mut store::Ptr<B>)
-> Result<Request<()>, ConnectionError>
{
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Frame::Headers(frame)) => {
// TODO: This error should probably be caught on receipt of the
// frame vs. now.
Ok(server::Peer::convert_poll_message(frame)?)
}
_ => panic!(),
}
}
pub fn poll_response(&mut self, stream: &mut store::Ptr<B>) pub fn poll_response(&mut self, stream: &mut store::Ptr<B>)
-> Poll<Response<()>, ConnectionError> { -> Poll<Response<()>, ConnectionError> {
// If the buffer is not empty, then the first frame must be a HEADERS // If the buffer is not empty, then the first frame must be a HEADERS
@@ -102,7 +119,7 @@ impl<B> Recv<B> where B: Buf {
pub fn recv_headers<P: Peer>(&mut self, pub fn recv_headers<P: Peer>(&mut self,
frame: frame::Headers, frame: frame::Headers,
stream: &mut store::Ptr<B>) stream: &mut store::Ptr<B>)
-> Result<Option<frame::Headers>, ConnectionError> -> Result<(), ConnectionError>
{ {
let is_initial = stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?; let is_initial = stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?;
@@ -118,13 +135,14 @@ impl<B> Recv<B> where B: Buf {
// Only servers can receive a headers frame that initiates the stream. // Only servers can receive a headers frame that initiates the stream.
// This is verified in `Streams` before calling this function. // This is verified in `Streams` before calling this function.
if P::is_server() { if P::is_server() {
Ok(Some(frame)) self.pending_accept.push(stream);
Ok(())
} else { } else {
// Push the frame onto the recv buffer // Push the frame onto the recv buffer
stream.pending_recv.push_back(&mut self.buffer, frame.into()); stream.pending_recv.push_back(&mut self.buffer, frame.into());
stream.notify_recv(); stream.notify_recv();
Ok(None) Ok(())
} }
} }
@@ -361,6 +379,10 @@ impl<B> Recv<B> where B: Buf {
Ok(().into()) Ok(().into())
} }
pub fn next_incoming(&mut self, store: &mut Store<B>) -> Option<store::Key> {
self.pending_accept.pop(store)
.map(|ptr| ptr.key())
}
pub fn poll_chunk(&mut self, stream: &mut Stream<B>) pub fn poll_chunk(&mut self, stream: &mut Stream<B>)
-> Poll<Option<Chunk>, ConnectionError> -> Poll<Option<Chunk>, ConnectionError>

View File

@@ -1,4 +1,4 @@
use client; use {client, server};
use proto::*; use proto::*;
use super::*; use super::*;
@@ -63,7 +63,7 @@ impl<B> Streams<B>
/// Process inbound headers /// Process inbound headers
pub fn recv_headers<P: Peer>(&mut self, frame: frame::Headers) pub fn recv_headers<P: Peer>(&mut self, frame: frame::Headers)
-> Result<Option<frame::Headers>, ConnectionError> -> Result<(), ConnectionError>
{ {
let id = frame.stream_id(); let id = frame.stream_id();
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
@@ -82,7 +82,7 @@ impl<B> Streams<B>
match try!(me.actions.recv.open::<P>(id)) { match try!(me.actions.recv.open::<P>(id)) {
Some(stream) => e.insert(stream), Some(stream) => e.insert(stream),
None => return Ok(None), None => return Ok(()),
} }
} }
}; };
@@ -222,6 +222,22 @@ impl<B> Streams<B>
*/ */
} }
pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
let key = {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
me.actions.recv.next_incoming(&mut me.store)
};
key.map(|key| {
StreamRef {
inner: self.inner.clone(),
key,
}
})
}
pub fn poll_window_update(&mut self) pub fn poll_window_update(&mut self)
-> Poll<WindowUpdate, ConnectionError> -> Poll<WindowUpdate, ConnectionError>
{ {
@@ -335,6 +351,37 @@ impl<B> StreamRef<B>
}) })
} }
/// Called by the server after the stream is accepted. Given that clients
/// initialize streams by sending HEADERS, the request will always be
/// available.
///
/// # Panics
///
/// This function panics if the request isn't present.
pub fn take_request(&self) -> Result<Request<()>, ConnectionError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.key);
me.actions.recv.take_request(&mut stream)
}
pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool)
-> Result<(), ConnectionError>
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let stream = me.store.resolve(self.key);
let frame = server::Peer::convert_send_message(
stream.id, response, end_of_stream);
me.actions.transition::<server::Peer, _, _>(stream, |actions, stream| {
actions.send.send_headers(frame, stream)
})
}
pub fn poll_response(&mut self) -> Poll<Response<()>, ConnectionError> { pub fn poll_response(&mut self) -> Poll<Response<()>, ConnectionError> {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;

View File

@@ -4,7 +4,7 @@ use proto::{self, Connection};
use error::Reason::*; use error::Reason::*;
use http::{self, Request, Response}; use http::{self, Request, Response};
use futures::{Future, Sink, Poll, Async, AsyncSink, IntoFuture}; use futures::{self, Future, Sink, Poll, Async, AsyncSink, IntoFuture};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use bytes::{Bytes, IntoBuf}; use bytes::{Bytes, IntoBuf};
@@ -86,6 +86,43 @@ impl<T, B> Server<T, B>
Handshake { inner: Box::new(handshake) } Handshake { inner: Box::new(handshake) }
} }
pub fn poll_close(&mut self) -> Poll<(), ConnectionError> {
self.connection.poll()
}
}
impl<T, B> futures::Stream for Server<T, B>
where T: AsyncRead + AsyncWrite + 'static,
B: IntoBuf + 'static,
{
type Item = (Request<Body<B>>, Stream<B>);
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Self::Item>, ConnectionError> {
// Always try to advance the internal state. Getting NotReady also is
// needed to allow this function to return NotReady.
match self.poll_close()? {
Async::Ready(_) => {
// If the socket is closed, don't return anything
// TODO: drop any pending streams
return Ok(None.into())
}
_ => {}
}
if let Some(inner) = self.connection.next_incoming() {
let (head, _) = inner.take_request()?.into_parts();
let body = Body { inner: inner.clone() };
let request = Request::from_parts(head, body);
let incoming = Stream { inner };
return Ok(Some((request, incoming)).into());
}
Ok(Async::NotReady)
}
} }
impl<T, B> fmt::Debug for Server<T, B> impl<T, B> fmt::Debug for Server<T, B>
@@ -100,6 +137,30 @@ impl<T, B> fmt::Debug for Server<T, B>
} }
} }
// ===== impl Stream =====
impl<B: IntoBuf> Stream<B> {
pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool)
-> Result<(), ConnectionError>
{
self.inner.send_response(response, end_of_stream)
}
/// Send data
pub fn send_data(&mut self, data: B, end_of_stream: bool)
-> Result<(), ConnectionError>
{
self.inner.send_data::<Peer>(data.into_buf(), end_of_stream)
}
/// Send trailers
pub fn send_trailers(&mut self, trailers: ())
-> Result<(), ConnectionError>
{
unimplemented!();
}
}
// ===== impl Flush ===== // ===== impl Flush =====
impl<T> Flush<T> { impl<T> Flush<T> {

View File

@@ -4,7 +4,7 @@ extern crate futures;
extern crate mock_io; extern crate mock_io;
extern crate env_logger; extern crate env_logger;
// use h2::server; use h2::server::Server;
use futures::*; use futures::*;
@@ -13,7 +13,6 @@ const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
#[test] #[test]
fn read_preface_in_multiple_frames() { fn read_preface_in_multiple_frames() {
/*
let _ = ::env_logger::init().unwrap(); let _ = ::env_logger::init().unwrap();
let mock = mock_io::Builder::new() let mock = mock_io::Builder::new()
@@ -25,9 +24,13 @@ fn read_preface_in_multiple_frames() {
.read(SETTINGS_ACK) .read(SETTINGS_ACK)
.build(); .build();
let h2 = server::handshake(mock) let h2 = Server::handshake(mock)
.wait().unwrap(); .wait().unwrap();
assert!(Stream::wait(h2).next().is_none()); assert!(Stream::wait(h2).next().is_none());
*/ }
#[test]
#[ignore]
fn accept_with_pending_connections_after_socket_close() {
} }