add SendResponse::poll_reset and SendStream::poll_reset to listen for reset streams (#279)
This commit is contained in:
@@ -51,6 +51,9 @@ pub enum UserError {
|
|||||||
|
|
||||||
/// Request submitted with relative URI.
|
/// Request submitted with relative URI.
|
||||||
MissingUriSchemeAndAuthority,
|
MissingUriSchemeAndAuthority,
|
||||||
|
|
||||||
|
/// Calls `SendResponse::poll_reset` after having called `send_response`.
|
||||||
|
PollResetAfterSendResponse,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl RecvError =====
|
// ===== impl RecvError =====
|
||||||
@@ -130,6 +133,7 @@ impl error::Error for UserError {
|
|||||||
OverflowedStreamId => "stream ID overflowed",
|
OverflowedStreamId => "stream ID overflowed",
|
||||||
MalformedHeaders => "malformed headers",
|
MalformedHeaders => "malformed headers",
|
||||||
MissingUriSchemeAndAuthority => "request URI missing scheme and authority",
|
MissingUriSchemeAndAuthority => "request URI missing scheme and authority",
|
||||||
|
PollResetAfterSendResponse => "poll_reset after send_response is illegal",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ pub(crate) use self::connection::{Config, Connection};
|
|||||||
pub(crate) use self::error::Error;
|
pub(crate) use self::error::Error;
|
||||||
pub(crate) use self::peer::{Peer, Dyn as DynPeer};
|
pub(crate) use self::peer::{Peer, Dyn as DynPeer};
|
||||||
pub(crate) use self::streams::{Key as StreamKey, StreamRef, OpaqueStreamRef, Streams};
|
pub(crate) use self::streams::{Key as StreamKey, StreamRef, OpaqueStreamRef, Streams};
|
||||||
pub(crate) use self::streams::{Prioritized, Open};
|
pub(crate) use self::streams::{PollReset, Prioritized, Open};
|
||||||
|
|
||||||
use codec::Codec;
|
use codec::Codec;
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ mod streams;
|
|||||||
|
|
||||||
pub(crate) use self::prioritize::Prioritized;
|
pub(crate) use self::prioritize::Prioritized;
|
||||||
pub(crate) use self::recv::Open;
|
pub(crate) use self::recv::Open;
|
||||||
|
pub(crate) use self::send::PollReset;
|
||||||
pub(crate) use self::store::Key;
|
pub(crate) use self::store::Key;
|
||||||
pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams};
|
pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams};
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,15 @@
|
|||||||
use codec::{RecvError, UserError};
|
use codec::{RecvError, UserError};
|
||||||
use codec::UserError::*;
|
|
||||||
use frame::{self, Reason};
|
use frame::{self, Reason};
|
||||||
use http;
|
use super::{
|
||||||
use super::*;
|
store, Buffer, Codec, Config, Counts, Frame, Prioritize,
|
||||||
|
Prioritized, Store, Stream, StreamId, StreamIdOverflow, WindowSize,
|
||||||
|
};
|
||||||
|
|
||||||
use bytes::Buf;
|
use bytes::Buf;
|
||||||
|
use http;
|
||||||
|
use futures::{Async, Poll};
|
||||||
|
use futures::task::Task;
|
||||||
|
use tokio_io::AsyncWrite;
|
||||||
|
|
||||||
use std::{cmp, io};
|
use std::{cmp, io};
|
||||||
|
|
||||||
@@ -21,6 +26,13 @@ pub(super) struct Send {
|
|||||||
prioritize: Prioritize,
|
prioritize: Prioritize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A value to detect which public API has called `poll_reset`.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) enum PollReset {
|
||||||
|
AwaitingHeaders,
|
||||||
|
Streaming,
|
||||||
|
}
|
||||||
|
|
||||||
impl Send {
|
impl Send {
|
||||||
/// Create a new `Send`
|
/// Create a new `Send`
|
||||||
pub fn new(config: &Config) -> Self {
|
pub fn new(config: &Config) -> Self {
|
||||||
@@ -196,7 +208,7 @@ impl Send {
|
|||||||
) -> Result<(), UserError> {
|
) -> Result<(), UserError> {
|
||||||
// TODO: Should this logic be moved into state.rs?
|
// TODO: Should this logic be moved into state.rs?
|
||||||
if !stream.state.is_send_streaming() {
|
if !stream.state.is_send_streaming() {
|
||||||
return Err(UnexpectedFrameType.into());
|
return Err(UserError::UnexpectedFrameType);
|
||||||
}
|
}
|
||||||
|
|
||||||
stream.state.send_close();
|
stream.state.send_close();
|
||||||
@@ -263,6 +275,20 @@ impl Send {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn poll_reset(
|
||||||
|
&self,
|
||||||
|
stream: &mut Stream,
|
||||||
|
mode: PollReset,
|
||||||
|
) -> Poll<Reason, ::Error> {
|
||||||
|
match stream.state.ensure_reason(mode)? {
|
||||||
|
Some(reason) => Ok(reason.into()),
|
||||||
|
None => {
|
||||||
|
stream.wait_send();
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn recv_connection_window_update(
|
pub fn recv_connection_window_update(
|
||||||
&mut self,
|
&mut self,
|
||||||
frame: frame::WindowUpdate,
|
frame: frame::WindowUpdate,
|
||||||
@@ -405,6 +431,6 @@ impl Send {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> {
|
pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> {
|
||||||
self.next_stream_id.map_err(|_| OverflowedStreamId)
|
self.next_stream_id.map_err(|_| UserError::OverflowedStreamId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
|
use std::io;
|
||||||
|
|
||||||
use codec::{RecvError, UserError};
|
use codec::{RecvError, UserError};
|
||||||
use codec::UserError::*;
|
use codec::UserError::*;
|
||||||
use frame::Reason;
|
use frame::Reason;
|
||||||
use proto;
|
use proto::{self, PollReset};
|
||||||
|
|
||||||
use self::Inner::*;
|
use self::Inner::*;
|
||||||
use self::Peer::*;
|
use self::Peer::*;
|
||||||
@@ -399,8 +401,6 @@ impl State {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
|
pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
|
||||||
use std::io;
|
|
||||||
|
|
||||||
// TODO: Is this correct?
|
// TODO: Is this correct?
|
||||||
match self.inner {
|
match self.inner {
|
||||||
Closed(Cause::Proto(reason)) |
|
Closed(Cause::Proto(reason)) |
|
||||||
@@ -412,6 +412,24 @@ impl State {
|
|||||||
_ => Ok(true),
|
_ => Ok(true),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a reason if the stream has been reset.
|
||||||
|
pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, ::Error> {
|
||||||
|
match self.inner {
|
||||||
|
Closed(Cause::Proto(reason)) |
|
||||||
|
Closed(Cause::LocallyReset(reason)) |
|
||||||
|
Closed(Cause::Scheduled(reason)) => Ok(Some(reason)),
|
||||||
|
Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into()).into()),
|
||||||
|
Open { local: Streaming, .. } |
|
||||||
|
HalfClosedRemote(Streaming) => match mode {
|
||||||
|
PollReset::AwaitingHeaders => {
|
||||||
|
Err(UserError::PollResetAfterSendResponse.into())
|
||||||
|
},
|
||||||
|
PollReset::Streaming => Ok(None),
|
||||||
|
},
|
||||||
|
_ => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for State {
|
impl Default for State {
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ pub(super) struct Stream {
|
|||||||
pub buffered_send_data: WindowSize,
|
pub buffered_send_data: WindowSize,
|
||||||
|
|
||||||
/// Task tracking additional send capacity (i.e. window updates).
|
/// Task tracking additional send capacity (i.e. window updates).
|
||||||
pub send_task: Option<task::Task>,
|
send_task: Option<task::Task>,
|
||||||
|
|
||||||
/// Frames pending for this stream being sent to the socket
|
/// Frames pending for this stream being sent to the socket
|
||||||
pub pending_send: buffer::Deque,
|
pub pending_send: buffer::Deque,
|
||||||
|
|||||||
@@ -708,7 +708,7 @@ where
|
|||||||
let mut stream = me.store.resolve(*key);
|
let mut stream = me.store.resolve(*key);
|
||||||
trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
|
trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
|
||||||
if stream.is_pending_open {
|
if stream.is_pending_open {
|
||||||
stream.send_task = Some(task::current());
|
stream.wait_send();
|
||||||
return Ok(Async::NotReady);
|
return Ok(Async::NotReady);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -930,6 +930,17 @@ impl<B> StreamRef<B> {
|
|||||||
me.actions.send.poll_capacity(&mut stream)
|
me.actions.send.poll_capacity(&mut stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Request to be notified for if a `RST_STREAM` is received for this stream.
|
||||||
|
pub(crate) fn poll_reset(&mut self, mode: proto::PollReset) -> Poll<Reason, ::Error> {
|
||||||
|
let mut me = self.opaque.inner.lock().unwrap();
|
||||||
|
let me = &mut *me;
|
||||||
|
|
||||||
|
let mut stream = me.store.resolve(self.opaque.key);
|
||||||
|
|
||||||
|
me.actions.send.poll_reset(&mut stream, mode)
|
||||||
|
.map_err(From::from)
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn key(&self) -> store::Key {
|
pub(crate) fn key(&self) -> store::Key {
|
||||||
self.opaque.key
|
self.opaque.key
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -972,6 +972,22 @@ impl<B: IntoBuf> SendResponse<B> {
|
|||||||
self.inner.send_reset(reason)
|
self.inner.send_reset(reason)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Polls to be notified when the client resets this stream.
|
||||||
|
///
|
||||||
|
/// If stream is still open, this returns `Ok(Async::NotReady)`, and
|
||||||
|
/// registers the task to be notified if a `RST_STREAM` is received.
|
||||||
|
///
|
||||||
|
/// If a `RST_STREAM` frame is received for this stream, calling this
|
||||||
|
/// method will yield the `Reason` for the reset.
|
||||||
|
///
|
||||||
|
/// # Error
|
||||||
|
///
|
||||||
|
/// Calling this method after having called `send_response` will return
|
||||||
|
/// a user error.
|
||||||
|
pub fn poll_reset(&mut self) -> Poll<Reason, ::Error> {
|
||||||
|
self.inner.poll_reset(proto::PollReset::AwaitingHeaders)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Support reserving push promises.
|
// TODO: Support reserving push promises.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
16
src/share.rs
16
src/share.rs
@@ -319,6 +319,22 @@ impl<B: IntoBuf> SendStream<B> {
|
|||||||
pub fn send_reset(&mut self, reason: Reason) {
|
pub fn send_reset(&mut self, reason: Reason) {
|
||||||
self.inner.send_reset(reason)
|
self.inner.send_reset(reason)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Polls to be notified when the client resets this stream.
|
||||||
|
///
|
||||||
|
/// If stream is still open, this returns `Ok(Async::NotReady)`, and
|
||||||
|
/// registers the task to be notified if a `RST_STREAM` is received.
|
||||||
|
///
|
||||||
|
/// If a `RST_STREAM` frame is received for this stream, calling this
|
||||||
|
/// method will yield the `Reason` for the reset.
|
||||||
|
///
|
||||||
|
/// # Error
|
||||||
|
///
|
||||||
|
/// If connection sees an error, this returns that error instead of a
|
||||||
|
/// `Reason`.
|
||||||
|
pub fn poll_reset(&mut self) -> Poll<Reason, ::Error> {
|
||||||
|
self.inner.poll_reset(proto::PollReset::Streaming)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl RecvStream =====
|
// ===== impl RecvStream =====
|
||||||
|
|||||||
@@ -926,6 +926,44 @@ fn notify_on_send_capacity() {
|
|||||||
done_rx.recv().unwrap();
|
done_rx.recv().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn send_stream_poll_reset() {
|
||||||
|
let _ = ::env_logger::try_init();
|
||||||
|
let (io, srv) = mock::new();
|
||||||
|
|
||||||
|
let srv = srv
|
||||||
|
.assert_client_handshake()
|
||||||
|
.unwrap()
|
||||||
|
.recv_settings()
|
||||||
|
.recv_frame(
|
||||||
|
frames::headers(1)
|
||||||
|
.request("POST", "https://example.com/")
|
||||||
|
)
|
||||||
|
.send_frame(frames::reset(1).refused())
|
||||||
|
.close();
|
||||||
|
|
||||||
|
let client = client::Builder::new()
|
||||||
|
.handshake::<_, Bytes>(io)
|
||||||
|
.expect("handshake")
|
||||||
|
.and_then(|(mut client, conn)| {
|
||||||
|
let request = Request::builder()
|
||||||
|
.method(Method::POST)
|
||||||
|
.uri("https://example.com/")
|
||||||
|
.body(())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let (_response, mut tx) = client.send_request(request, false).unwrap();
|
||||||
|
conn.drive(futures::future::poll_fn(move || {
|
||||||
|
tx.poll_reset()
|
||||||
|
}))
|
||||||
|
.map(|(_, reason)| {
|
||||||
|
assert_eq!(reason, Reason::REFUSED_STREAM);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
client.join(srv).wait().expect("wait");
|
||||||
|
}
|
||||||
|
|
||||||
const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
|
const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
|
||||||
const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
|
const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
|
||||||
|
|
||||||
|
|||||||
@@ -449,3 +449,143 @@ fn too_big_headers_sends_reset_after_431_if_not_eos() {
|
|||||||
|
|
||||||
srv.join(client).wait().expect("wait");
|
srv.join(client).wait().expect("wait");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poll_reset() {
|
||||||
|
let _ = ::env_logger::try_init();
|
||||||
|
let (io, client) = mock::new();
|
||||||
|
|
||||||
|
let client = client
|
||||||
|
.assert_server_handshake()
|
||||||
|
.unwrap()
|
||||||
|
.recv_settings()
|
||||||
|
.send_frame(
|
||||||
|
frames::headers(1)
|
||||||
|
.request("GET", "https://example.com/")
|
||||||
|
.eos()
|
||||||
|
)
|
||||||
|
.idle_ms(10)
|
||||||
|
.send_frame(frames::reset(1).cancel())
|
||||||
|
.close();
|
||||||
|
|
||||||
|
let srv = server::Builder::new()
|
||||||
|
.handshake::<_, Bytes>(io)
|
||||||
|
.expect("handshake")
|
||||||
|
.and_then(|srv| {
|
||||||
|
srv.into_future()
|
||||||
|
.expect("server")
|
||||||
|
.map(|(req, conn)| {
|
||||||
|
(req.expect("request"), conn)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.and_then(|((_req, mut tx), conn)| {
|
||||||
|
let conn = conn.into_future()
|
||||||
|
.map(|(req, _)| assert!(req.is_none(), "no second request"))
|
||||||
|
.expect("conn");
|
||||||
|
conn.join(
|
||||||
|
futures::future::poll_fn(move || {
|
||||||
|
tx.poll_reset()
|
||||||
|
})
|
||||||
|
.map(|reason| {
|
||||||
|
assert_eq!(reason, Reason::CANCEL);
|
||||||
|
})
|
||||||
|
.expect("poll_reset")
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
srv.join(client).wait().expect("wait");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poll_reset_io_error() {
|
||||||
|
let _ = ::env_logger::try_init();
|
||||||
|
let (io, client) = mock::new();
|
||||||
|
|
||||||
|
let client = client
|
||||||
|
.assert_server_handshake()
|
||||||
|
.unwrap()
|
||||||
|
.recv_settings()
|
||||||
|
.send_frame(
|
||||||
|
frames::headers(1)
|
||||||
|
.request("GET", "https://example.com/")
|
||||||
|
.eos()
|
||||||
|
)
|
||||||
|
.idle_ms(10)
|
||||||
|
.close();
|
||||||
|
|
||||||
|
let srv = server::Builder::new()
|
||||||
|
.handshake::<_, Bytes>(io)
|
||||||
|
.expect("handshake")
|
||||||
|
.and_then(|srv| {
|
||||||
|
srv.into_future()
|
||||||
|
.expect("server")
|
||||||
|
.map(|(req, conn)| {
|
||||||
|
(req.expect("request"), conn)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.and_then(|((_req, mut tx), conn)| {
|
||||||
|
let conn = conn.into_future()
|
||||||
|
.map(|(req, _)| assert!(req.is_none(), "no second request"))
|
||||||
|
.expect("conn");
|
||||||
|
conn.join(
|
||||||
|
futures::future::poll_fn(move || {
|
||||||
|
tx.poll_reset()
|
||||||
|
})
|
||||||
|
.expect_err("poll_reset should error")
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
srv.join(client).wait().expect("wait");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poll_reset_after_send_response_is_user_error() {
|
||||||
|
let _ = ::env_logger::try_init();
|
||||||
|
let (io, client) = mock::new();
|
||||||
|
|
||||||
|
let client = client
|
||||||
|
.assert_server_handshake()
|
||||||
|
.unwrap()
|
||||||
|
.recv_settings()
|
||||||
|
.send_frame(
|
||||||
|
frames::headers(1)
|
||||||
|
.request("GET", "https://example.com/")
|
||||||
|
.eos()
|
||||||
|
)
|
||||||
|
.recv_frame(
|
||||||
|
frames::headers(1)
|
||||||
|
.response(200)
|
||||||
|
)
|
||||||
|
.recv_frame(
|
||||||
|
// After the error, our server will drop the handles,
|
||||||
|
// meaning we receive a RST_STREAM here.
|
||||||
|
frames::reset(1).cancel()
|
||||||
|
)
|
||||||
|
.idle_ms(10)
|
||||||
|
.close();
|
||||||
|
|
||||||
|
let srv = server::Builder::new()
|
||||||
|
.handshake::<_, Bytes>(io)
|
||||||
|
.expect("handshake")
|
||||||
|
.and_then(|srv| {
|
||||||
|
srv.into_future()
|
||||||
|
.expect("server")
|
||||||
|
.map(|(req, conn)| {
|
||||||
|
(req.expect("request"), conn)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.and_then(|((_req, mut tx), conn)| {
|
||||||
|
let conn = conn.into_future()
|
||||||
|
.map(|(req, _)| assert!(req.is_none(), "no second request"))
|
||||||
|
.expect("conn");
|
||||||
|
tx.send_response(Response::new(()), false).expect("response");
|
||||||
|
conn.join(
|
||||||
|
futures::future::poll_fn(move || {
|
||||||
|
tx.poll_reset()
|
||||||
|
})
|
||||||
|
.expect_err("poll_reset should error")
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
srv.join(client).wait().expect("wait");
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user