From 3a4633d205f4dbebfc6077313b0ecae3658f5a02 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 30 May 2018 22:57:43 +0200 Subject: [PATCH] add SendResponse::poll_reset and SendStream::poll_reset to listen for reset streams (#279) --- src/codec/error.rs | 4 + src/proto/mod.rs | 2 +- src/proto/streams/mod.rs | 1 + src/proto/streams/send.rs | 36 ++++++- src/proto/streams/state.rs | 24 ++++- src/proto/streams/stream.rs | 2 +- src/proto/streams/streams.rs | 13 ++- src/server.rs | 16 +++ src/share.rs | 16 +++ tests/h2-tests/tests/client_request.rs | 38 +++++++ tests/h2-tests/tests/server.rs | 140 +++++++++++++++++++++++++ 11 files changed, 281 insertions(+), 11 deletions(-) diff --git a/src/codec/error.rs b/src/codec/error.rs index 18703ab..b979ae2 100644 --- a/src/codec/error.rs +++ b/src/codec/error.rs @@ -51,6 +51,9 @@ pub enum UserError { /// Request submitted with relative URI. MissingUriSchemeAndAuthority, + + /// Calls `SendResponse::poll_reset` after having called `send_response`. + PollResetAfterSendResponse, } // ===== impl RecvError ===== @@ -130,6 +133,7 @@ impl error::Error for UserError { OverflowedStreamId => "stream ID overflowed", MalformedHeaders => "malformed headers", MissingUriSchemeAndAuthority => "request URI missing scheme and authority", + PollResetAfterSendResponse => "poll_reset after send_response is illegal", } } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 30e69e8..e757019 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -10,7 +10,7 @@ pub(crate) use self::connection::{Config, Connection}; pub(crate) use self::error::Error; pub(crate) use self::peer::{Peer, Dyn as DynPeer}; pub(crate) use self::streams::{Key as StreamKey, StreamRef, OpaqueStreamRef, Streams}; -pub(crate) use self::streams::{Prioritized, Open}; +pub(crate) use self::streams::{PollReset, Prioritized, Open}; use codec::Codec; diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index bbf19fa..2f1467a 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -11,6 +11,7 @@ mod streams; pub(crate) use self::prioritize::Prioritized; pub(crate) use self::recv::Open; +pub(crate) use self::send::PollReset; pub(crate) use self::store::Key; pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams}; diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 9a98d0a..0d1642c 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -1,10 +1,15 @@ use codec::{RecvError, UserError}; -use codec::UserError::*; use frame::{self, Reason}; -use http; -use super::*; +use super::{ + store, Buffer, Codec, Config, Counts, Frame, Prioritize, + Prioritized, Store, Stream, StreamId, StreamIdOverflow, WindowSize, +}; use bytes::Buf; +use http; +use futures::{Async, Poll}; +use futures::task::Task; +use tokio_io::AsyncWrite; use std::{cmp, io}; @@ -21,6 +26,13 @@ pub(super) struct Send { prioritize: Prioritize, } +/// A value to detect which public API has called `poll_reset`. +#[derive(Debug)] +pub(crate) enum PollReset { + AwaitingHeaders, + Streaming, +} + impl Send { /// Create a new `Send` pub fn new(config: &Config) -> Self { @@ -196,7 +208,7 @@ impl Send { ) -> Result<(), UserError> { // TODO: Should this logic be moved into state.rs? if !stream.state.is_send_streaming() { - return Err(UnexpectedFrameType.into()); + return Err(UserError::UnexpectedFrameType); } stream.state.send_close(); @@ -263,6 +275,20 @@ impl Send { } } + pub fn poll_reset( + &self, + stream: &mut Stream, + mode: PollReset, + ) -> Poll { + match stream.state.ensure_reason(mode)? { + Some(reason) => Ok(reason.into()), + None => { + stream.wait_send(); + Ok(Async::NotReady) + }, + } + } + pub fn recv_connection_window_update( &mut self, frame: frame::WindowUpdate, @@ -405,6 +431,6 @@ impl Send { } pub fn ensure_next_stream_id(&self) -> Result { - self.next_stream_id.map_err(|_| OverflowedStreamId) + self.next_stream_id.map_err(|_| UserError::OverflowedStreamId) } } diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 628dd35..36257fe 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -1,7 +1,9 @@ +use std::io; + use codec::{RecvError, UserError}; use codec::UserError::*; use frame::Reason; -use proto; +use proto::{self, PollReset}; use self::Inner::*; use self::Peer::*; @@ -399,8 +401,6 @@ impl State { } pub fn ensure_recv_open(&self) -> Result { - use std::io; - // TODO: Is this correct? match self.inner { Closed(Cause::Proto(reason)) | @@ -412,6 +412,24 @@ impl State { _ => Ok(true), } } + + /// Returns a reason if the stream has been reset. + pub(super) fn ensure_reason(&self, mode: PollReset) -> Result, ::Error> { + match self.inner { + Closed(Cause::Proto(reason)) | + Closed(Cause::LocallyReset(reason)) | + Closed(Cause::Scheduled(reason)) => Ok(Some(reason)), + Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into()).into()), + Open { local: Streaming, .. } | + HalfClosedRemote(Streaming) => match mode { + PollReset::AwaitingHeaders => { + Err(UserError::PollResetAfterSendResponse.into()) + }, + PollReset::Streaming => Ok(None), + }, + _ => Ok(None), + } + } } impl Default for State { diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index fa7079f..39c69a9 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -47,7 +47,7 @@ pub(super) struct Stream { pub buffered_send_data: WindowSize, /// Task tracking additional send capacity (i.e. window updates). - pub send_task: Option, + send_task: Option, /// Frames pending for this stream being sent to the socket pub pending_send: buffer::Deque, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 8b11643..dfd261d 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -708,7 +708,7 @@ where let mut stream = me.store.resolve(*key); trace!("poll_pending_open; stream = {:?}", stream.is_pending_open); if stream.is_pending_open { - stream.send_task = Some(task::current()); + stream.wait_send(); return Ok(Async::NotReady); } } @@ -930,6 +930,17 @@ impl StreamRef { me.actions.send.poll_capacity(&mut stream) } + /// Request to be notified for if a `RST_STREAM` is received for this stream. + pub(crate) fn poll_reset(&mut self, mode: proto::PollReset) -> Poll { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.opaque.key); + + me.actions.send.poll_reset(&mut stream, mode) + .map_err(From::from) + } + pub(crate) fn key(&self) -> store::Key { self.opaque.key } diff --git a/src/server.rs b/src/server.rs index f2d7049..e37f402 100644 --- a/src/server.rs +++ b/src/server.rs @@ -972,6 +972,22 @@ impl SendResponse { self.inner.send_reset(reason) } + /// Polls to be notified when the client resets this stream. + /// + /// If stream is still open, this returns `Ok(Async::NotReady)`, and + /// registers the task to be notified if a `RST_STREAM` is received. + /// + /// If a `RST_STREAM` frame is received for this stream, calling this + /// method will yield the `Reason` for the reset. + /// + /// # Error + /// + /// Calling this method after having called `send_response` will return + /// a user error. + pub fn poll_reset(&mut self) -> Poll { + self.inner.poll_reset(proto::PollReset::AwaitingHeaders) + } + // TODO: Support reserving push promises. } diff --git a/src/share.rs b/src/share.rs index b50625b..dc628b7 100644 --- a/src/share.rs +++ b/src/share.rs @@ -319,6 +319,22 @@ impl SendStream { pub fn send_reset(&mut self, reason: Reason) { self.inner.send_reset(reason) } + + /// Polls to be notified when the client resets this stream. + /// + /// If stream is still open, this returns `Ok(Async::NotReady)`, and + /// registers the task to be notified if a `RST_STREAM` is received. + /// + /// If a `RST_STREAM` frame is received for this stream, calling this + /// method will yield the `Reason` for the reset. + /// + /// # Error + /// + /// If connection sees an error, this returns that error instead of a + /// `Reason`. + pub fn poll_reset(&mut self) -> Poll { + self.inner.poll_reset(proto::PollReset::Streaming) + } } // ===== impl RecvStream ===== diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 48b9764..324e14c 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -926,6 +926,44 @@ fn notify_on_send_capacity() { done_rx.recv().unwrap(); } +#[test] +fn send_stream_poll_reset() { + let _ = ::env_logger::try_init(); + let (io, srv) = mock::new(); + + let srv = srv + .assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("POST", "https://example.com/") + ) + .send_frame(frames::reset(1).refused()) + .close(); + + let client = client::Builder::new() + .handshake::<_, Bytes>(io) + .expect("handshake") + .and_then(|(mut client, conn)| { + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + + let (_response, mut tx) = client.send_request(request, false).unwrap(); + conn.drive(futures::future::poll_fn(move || { + tx.poll_reset() + })) + .map(|(_, reason)| { + assert_eq!(reason, Reason::REFUSED_STREAM); + }) + }); + + client.join(srv).wait().expect("wait"); +} + const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index 5007150..55a8fa2 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -449,3 +449,143 @@ fn too_big_headers_sends_reset_after_431_if_not_eos() { srv.join(client).wait().expect("wait"); } + +#[test] +fn poll_reset() { + let _ = ::env_logger::try_init(); + let (io, client) = mock::new(); + + let client = client + .assert_server_handshake() + .unwrap() + .recv_settings() + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos() + ) + .idle_ms(10) + .send_frame(frames::reset(1).cancel()) + .close(); + + let srv = server::Builder::new() + .handshake::<_, Bytes>(io) + .expect("handshake") + .and_then(|srv| { + srv.into_future() + .expect("server") + .map(|(req, conn)| { + (req.expect("request"), conn) + }) + }) + .and_then(|((_req, mut tx), conn)| { + let conn = conn.into_future() + .map(|(req, _)| assert!(req.is_none(), "no second request")) + .expect("conn"); + conn.join( + futures::future::poll_fn(move || { + tx.poll_reset() + }) + .map(|reason| { + assert_eq!(reason, Reason::CANCEL); + }) + .expect("poll_reset") + ) + }); + + srv.join(client).wait().expect("wait"); +} + +#[test] +fn poll_reset_io_error() { + let _ = ::env_logger::try_init(); + let (io, client) = mock::new(); + + let client = client + .assert_server_handshake() + .unwrap() + .recv_settings() + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos() + ) + .idle_ms(10) + .close(); + + let srv = server::Builder::new() + .handshake::<_, Bytes>(io) + .expect("handshake") + .and_then(|srv| { + srv.into_future() + .expect("server") + .map(|(req, conn)| { + (req.expect("request"), conn) + }) + }) + .and_then(|((_req, mut tx), conn)| { + let conn = conn.into_future() + .map(|(req, _)| assert!(req.is_none(), "no second request")) + .expect("conn"); + conn.join( + futures::future::poll_fn(move || { + tx.poll_reset() + }) + .expect_err("poll_reset should error") + ) + }); + + srv.join(client).wait().expect("wait"); +} + +#[test] +fn poll_reset_after_send_response_is_user_error() { + let _ = ::env_logger::try_init(); + let (io, client) = mock::new(); + + let client = client + .assert_server_handshake() + .unwrap() + .recv_settings() + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos() + ) + .recv_frame( + frames::headers(1) + .response(200) + ) + .recv_frame( + // After the error, our server will drop the handles, + // meaning we receive a RST_STREAM here. + frames::reset(1).cancel() + ) + .idle_ms(10) + .close(); + + let srv = server::Builder::new() + .handshake::<_, Bytes>(io) + .expect("handshake") + .and_then(|srv| { + srv.into_future() + .expect("server") + .map(|(req, conn)| { + (req.expect("request"), conn) + }) + }) + .and_then(|((_req, mut tx), conn)| { + let conn = conn.into_future() + .map(|(req, _)| assert!(req.is_none(), "no second request")) + .expect("conn"); + tx.send_response(Response::new(()), false).expect("response"); + conn.join( + futures::future::poll_fn(move || { + tx.poll_reset() + }) + .expect_err("poll_reset should error") + ) + }); + + srv.join(client).wait().expect("wait"); +}