From 8f2b69c280c4e41dc572fc7612d5b5c324c86208 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 9 Aug 2017 10:36:03 -0700 Subject: [PATCH] Get server working again (mostly) --- examples/client.rs | 37 ++++----------- examples/server.rs | 18 ++++---- src/proto/connection.rs | 25 +++++----- src/proto/streams/recv.rs | 30 ++++++++++-- src/proto/streams/streams.rs | 53 ++++++++++++++++++++-- src/server.rs | 63 +++++++++++++++++++++++++- tests/{server_preface.rs => server.rs} | 11 +++-- 7 files changed, 176 insertions(+), 61 deletions(-) rename tests/{server_preface.rs => server.rs} (82%) diff --git a/examples/client.rs b/examples/client.rs index b371ab8..ce203e7 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,4 +1,3 @@ -/* extern crate h2; extern crate http; extern crate futures; @@ -7,10 +6,9 @@ extern crate tokio_core; extern crate io_dump; extern crate env_logger; -use h2::client; +use h2::client::Client; use http::*; - use futures::*; use tokio_core::reactor; @@ -27,39 +25,24 @@ pub fn main() { let tcp = tcp.then(|res| { let tcp = io_dump::Dump::to_stdout(res.unwrap()); - client::handshake(tcp) + Client::handshake(tcp) }) .then(|res| { - let conn = res.unwrap(); + let mut client = res.unwrap(); println!("sending request"); - let mut request = request::Head::default(); - request.method = method::POST; - request.uri = "https://http2.akamai.com/".parse().unwrap(); - // request.version = version::H2; + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); - conn.send_request(1.into(), request, true) - }) - /* - .then(|res| { - let conn = res.unwrap(); - - conn.send_data(1.into(), "hello".into(), true) - }) - */ - .then(|res| { - let conn = res.unwrap(); - // Get the next message - conn.for_each(|frame| { - println!("RX: {:?}", frame); + let stream = client.request(request, true); + client.join(stream.and_then(|response| { + println!("GOT RESPONSE: {:?}", response); Ok(()) - }) + })) }) ; core.run(tcp).unwrap(); } -*/ - -pub fn main() {} diff --git a/examples/server.rs b/examples/server.rs index 7599aba..80767d4 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -1,4 +1,3 @@ -/* extern crate h2; extern crate http; extern crate futures; @@ -7,10 +6,9 @@ extern crate tokio_core; extern crate io_dump; extern crate env_logger; -use h2::server; - -use http::{response, status}; +use h2::server::Server; +use http::*; use futures::*; use tokio_core::reactor; @@ -31,12 +29,18 @@ pub fn main() { let server = listener.incoming().for_each(move |(socket, _)| { let socket = io_dump::Dump::to_stdout(socket); - let connection = server::handshake(socket) + let connection = Server::handshake(socket) .then(|res| { let conn = res.unwrap(); println!("H2 connection bound"); + conn.for_each(|(request, stream)| { + println!("GOT request: {:?}", request); + Ok(()) + }) + + /* // Receive a request conn.into_future() .then(|res| { @@ -60,6 +64,7 @@ pub fn main() { conn.send_data(1.into(), "world".into(), true) }) + */ }) .then(|res| { let _ = res.unwrap(); @@ -73,6 +78,3 @@ pub fn main() { core.run(server).unwrap(); } -*/ - -pub fn main() {} diff --git a/src/proto/connection.rs b/src/proto/connection.rs index e8284c4..9c2541e 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,4 +1,4 @@ -use {client, ConnectionError, Frame}; +use {client, server, ConnectionError, Frame}; use HeaderMap; use frame::{self, StreamId}; @@ -122,19 +122,7 @@ impl Connection match frame { Some(Headers(frame)) => { trace!("recv HEADERS; frame={:?}", frame); - - if let Some(frame) = try!(self.streams.recv_headers::

(frame)) { - unimplemented!(); - } - - /* - // Update stream state while ensuring that the headers frame - // can be received. - if let Some(frame) = try!(self.streams.recv_headers(frame)) { - let frame = Self::convert_poll_message(frame)?; - return Ok(Some(frame).into()); - } - */ + try!(self.streams.recv_headers::

(frame)); } Some(Data(frame)) => { trace!("recv DATA; frame={:?}", frame); @@ -269,6 +257,15 @@ impl Connection } } +impl Connection + where T: AsyncRead + AsyncWrite, + B: IntoBuf, +{ + pub fn next_incoming(&mut self) -> Option> { + self.streams.next_incoming() + } +} + /* impl Connection where T: AsyncRead + AsyncWrite, diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 3980a6c..3c4c623 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,4 +1,4 @@ -use {client, frame, ConnectionError}; +use {client, server, frame, ConnectionError}; use proto::*; use super::*; @@ -25,6 +25,9 @@ pub(super) struct Recv { /// TODO: don't use a VecDeque pending_window_updates: VecDeque, + /// New streams to be accepted + pending_accept: store::List, + /// Holds frames that are waiting to be read buffer: Buffer, @@ -54,6 +57,7 @@ impl Recv where B: Buf { init_window_sz: config.init_remote_window_sz, flow_control: FlowControl::new(config.init_remote_window_sz), pending_window_updates: VecDeque::new(), + pending_accept: store::List::new(), buffer: Buffer::new(), refused: None, _p: PhantomData, @@ -78,6 +82,19 @@ impl Recv where B: Buf { Ok(Some(Stream::new(id))) } + pub fn take_request(&mut self, stream: &mut store::Ptr) + -> Result, ConnectionError> + { + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Frame::Headers(frame)) => { + // TODO: This error should probably be caught on receipt of the + // frame vs. now. + Ok(server::Peer::convert_poll_message(frame)?) + } + _ => panic!(), + } + } + pub fn poll_response(&mut self, stream: &mut store::Ptr) -> Poll, ConnectionError> { // If the buffer is not empty, then the first frame must be a HEADERS @@ -102,7 +119,7 @@ impl Recv where B: Buf { pub fn recv_headers(&mut self, frame: frame::Headers, stream: &mut store::Ptr) - -> Result, ConnectionError> + -> Result<(), ConnectionError> { let is_initial = stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?; @@ -118,13 +135,14 @@ impl Recv where B: Buf { // Only servers can receive a headers frame that initiates the stream. // This is verified in `Streams` before calling this function. if P::is_server() { - Ok(Some(frame)) + self.pending_accept.push(stream); + Ok(()) } else { // Push the frame onto the recv buffer stream.pending_recv.push_back(&mut self.buffer, frame.into()); stream.notify_recv(); - Ok(None) + Ok(()) } } @@ -361,6 +379,10 @@ impl Recv where B: Buf { Ok(().into()) } + pub fn next_incoming(&mut self, store: &mut Store) -> Option { + self.pending_accept.pop(store) + .map(|ptr| ptr.key()) + } pub fn poll_chunk(&mut self, stream: &mut Stream) -> Poll, ConnectionError> diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index de23fcb..20d8b3f 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1,4 +1,4 @@ -use client; +use {client, server}; use proto::*; use super::*; @@ -63,7 +63,7 @@ impl Streams /// Process inbound headers pub fn recv_headers(&mut self, frame: frame::Headers) - -> Result, ConnectionError> + -> Result<(), ConnectionError> { let id = frame.stream_id(); let mut me = self.inner.lock().unwrap(); @@ -82,7 +82,7 @@ impl Streams match try!(me.actions.recv.open::

(id)) { Some(stream) => e.insert(stream), - None => return Ok(None), + None => return Ok(()), } } }; @@ -222,6 +222,22 @@ impl Streams */ } + pub fn next_incoming(&mut self) -> Option> { + let key = { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions.recv.next_incoming(&mut me.store) + }; + + key.map(|key| { + StreamRef { + inner: self.inner.clone(), + key, + } + }) + } + pub fn poll_window_update(&mut self) -> Poll { @@ -335,6 +351,37 @@ impl StreamRef }) } + /// Called by the server after the stream is accepted. Given that clients + /// initialize streams by sending HEADERS, the request will always be + /// available. + /// + /// # Panics + /// + /// This function panics if the request isn't present. + pub fn take_request(&self) -> Result, ConnectionError> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + me.actions.recv.take_request(&mut stream) + } + + pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool) + -> Result<(), ConnectionError> + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let stream = me.store.resolve(self.key); + + let frame = server::Peer::convert_send_message( + stream.id, response, end_of_stream); + + me.actions.transition::(stream, |actions, stream| { + actions.send.send_headers(frame, stream) + }) + } + pub fn poll_response(&mut self) -> Poll, ConnectionError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; diff --git a/src/server.rs b/src/server.rs index bd74a2d..92ffd37 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,7 +4,7 @@ use proto::{self, Connection}; use error::Reason::*; use http::{self, Request, Response}; -use futures::{Future, Sink, Poll, Async, AsyncSink, IntoFuture}; +use futures::{self, Future, Sink, Poll, Async, AsyncSink, IntoFuture}; use tokio_io::{AsyncRead, AsyncWrite}; use bytes::{Bytes, IntoBuf}; @@ -86,6 +86,43 @@ impl Server Handshake { inner: Box::new(handshake) } } + + pub fn poll_close(&mut self) -> Poll<(), ConnectionError> { + self.connection.poll() + } +} + +impl futures::Stream for Server + where T: AsyncRead + AsyncWrite + 'static, + B: IntoBuf + 'static, +{ + type Item = (Request>, Stream); + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + // Always try to advance the internal state. Getting NotReady also is + // needed to allow this function to return NotReady. + match self.poll_close()? { + Async::Ready(_) => { + // If the socket is closed, don't return anything + // TODO: drop any pending streams + return Ok(None.into()) + } + _ => {} + } + + if let Some(inner) = self.connection.next_incoming() { + let (head, _) = inner.take_request()?.into_parts(); + let body = Body { inner: inner.clone() }; + + let request = Request::from_parts(head, body); + let incoming = Stream { inner }; + + return Ok(Some((request, incoming)).into()); + } + + Ok(Async::NotReady) + } } impl fmt::Debug for Server @@ -100,6 +137,30 @@ impl fmt::Debug for Server } } +// ===== impl Stream ===== + +impl Stream { + pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool) + -> Result<(), ConnectionError> + { + self.inner.send_response(response, end_of_stream) + } + + /// Send data + pub fn send_data(&mut self, data: B, end_of_stream: bool) + -> Result<(), ConnectionError> + { + self.inner.send_data::(data.into_buf(), end_of_stream) + } + + /// Send trailers + pub fn send_trailers(&mut self, trailers: ()) + -> Result<(), ConnectionError> + { + unimplemented!(); + } +} + // ===== impl Flush ===== impl Flush { diff --git a/tests/server_preface.rs b/tests/server.rs similarity index 82% rename from tests/server_preface.rs rename to tests/server.rs index a1a15d3..c41dfee 100644 --- a/tests/server_preface.rs +++ b/tests/server.rs @@ -4,7 +4,7 @@ extern crate futures; extern crate mock_io; extern crate env_logger; -// use h2::server; +use h2::server::Server; use futures::*; @@ -13,7 +13,6 @@ const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; #[test] fn read_preface_in_multiple_frames() { - /* let _ = ::env_logger::init().unwrap(); let mock = mock_io::Builder::new() @@ -25,9 +24,13 @@ fn read_preface_in_multiple_frames() { .read(SETTINGS_ACK) .build(); - let h2 = server::handshake(mock) + let h2 = Server::handshake(mock) .wait().unwrap(); assert!(Stream::wait(h2).next().is_none()); - */ +} + +#[test] +#[ignore] +fn accept_with_pending_connections_after_socket_close() { }