Refactor errors internals (#556)

h2::Error now knows whether protocol errors happened because the user
sent them, because it was received from the remote peer, or because
the library itself emitted an error because it detected a protocol
violation.

It also keeps track of whether it came from a RST_STREAM or GO_AWAY
frame, and in the case of the latter, it includes the additional
debug data if any.

Fixes #530
This commit is contained in:
Anthony Ramine
2021-09-28 18:04:35 +02:00
committed by GitHub
parent cab307d2ed
commit 465f0337f8
26 changed files with 464 additions and 432 deletions

View File

@@ -135,9 +135,9 @@
//! [`Builder`]: struct.Builder.html //! [`Builder`]: struct.Builder.html
//! [`Error`]: ../struct.Error.html //! [`Error`]: ../struct.Error.html
use crate::codec::{Codec, RecvError, SendError, UserError}; use crate::codec::{Codec, SendError, UserError};
use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId}; use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
use crate::proto; use crate::proto::{self, Error};
use crate::{FlowControl, PingPong, RecvStream, SendStream}; use crate::{FlowControl, PingPong, RecvStream, SendStream};
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
@@ -1493,7 +1493,7 @@ impl proto::Peer for Peer {
pseudo: Pseudo, pseudo: Pseudo,
fields: HeaderMap, fields: HeaderMap,
stream_id: StreamId, stream_id: StreamId,
) -> Result<Self::Poll, RecvError> { ) -> Result<Self::Poll, Error> {
let mut b = Response::builder(); let mut b = Response::builder();
b = b.version(Version::HTTP_2); b = b.version(Version::HTTP_2);
@@ -1507,10 +1507,7 @@ impl proto::Peer for Peer {
Err(_) => { Err(_) => {
// TODO: Should there be more specialized handling for different // TODO: Should there be more specialized handling for different
// kinds of errors // kinds of errors
return Err(RecvError::Stream { return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
id: stream_id,
reason: Reason::PROTOCOL_ERROR,
});
} }
}; };

View File

@@ -1,26 +1,12 @@
use crate::frame::{Reason, StreamId}; use crate::proto::Error;
use std::{error, fmt, io}; use std::{error, fmt, io};
/// Errors that are received
#[derive(Debug)]
pub enum RecvError {
Connection(Reason),
Stream { id: StreamId, reason: Reason },
Io(io::Error),
}
/// Errors caused by sending a message /// Errors caused by sending a message
#[derive(Debug)] #[derive(Debug)]
pub enum SendError { pub enum SendError {
/// User error Connection(Error),
User(UserError), User(UserError),
/// Connection error prevents sending.
Connection(Reason),
/// I/O error
Io(io::Error),
} }
/// Errors caused by users of the library /// Errors caused by users of the library
@@ -65,47 +51,22 @@ pub enum UserError {
PeerDisabledServerPush, PeerDisabledServerPush,
} }
// ===== impl RecvError =====
impl From<io::Error> for RecvError {
fn from(src: io::Error) -> Self {
RecvError::Io(src)
}
}
impl error::Error for RecvError {}
impl fmt::Display for RecvError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use self::RecvError::*;
match *self {
Connection(ref reason) => reason.fmt(fmt),
Stream { ref reason, .. } => reason.fmt(fmt),
Io(ref e) => e.fmt(fmt),
}
}
}
// ===== impl SendError ===== // ===== impl SendError =====
impl error::Error for SendError {} impl error::Error for SendError {}
impl fmt::Display for SendError { impl fmt::Display for SendError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use self::SendError::*;
match *self { match *self {
User(ref e) => e.fmt(fmt), Self::Connection(ref e) => e.fmt(fmt),
Connection(ref reason) => reason.fmt(fmt), Self::User(ref e) => e.fmt(fmt),
Io(ref e) => e.fmt(fmt),
} }
} }
} }
impl From<io::Error> for SendError { impl From<io::Error> for SendError {
fn from(src: io::Error) -> Self { fn from(src: io::Error) -> Self {
SendError::Io(src) Self::Connection(src.into())
} }
} }

View File

@@ -1,8 +1,8 @@
use crate::codec::RecvError;
use crate::frame::{self, Frame, Kind, Reason}; use crate::frame::{self, Frame, Kind, Reason};
use crate::frame::{ use crate::frame::{
DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE, DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE,
}; };
use crate::proto::Error;
use crate::hpack; use crate::hpack;
@@ -98,8 +98,7 @@ fn decode_frame(
max_header_list_size: usize, max_header_list_size: usize,
partial_inout: &mut Option<Partial>, partial_inout: &mut Option<Partial>,
mut bytes: BytesMut, mut bytes: BytesMut,
) -> Result<Option<Frame>, RecvError> { ) -> Result<Option<Frame>, Error> {
use self::RecvError::*;
let span = tracing::trace_span!("FramedRead::decode_frame", offset = bytes.len()); let span = tracing::trace_span!("FramedRead::decode_frame", offset = bytes.len());
let _e = span.enter(); let _e = span.enter();
@@ -110,7 +109,7 @@ fn decode_frame(
if partial_inout.is_some() && head.kind() != Kind::Continuation { if partial_inout.is_some() && head.kind() != Kind::Continuation {
proto_err!(conn: "expected CONTINUATION, got {:?}", head.kind()); proto_err!(conn: "expected CONTINUATION, got {:?}", head.kind());
return Err(Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
} }
let kind = head.kind(); let kind = head.kind();
@@ -131,14 +130,11 @@ fn decode_frame(
// A stream cannot depend on itself. An endpoint MUST // A stream cannot depend on itself. An endpoint MUST
// treat this as a stream error (Section 5.4.2) of type // treat this as a stream error (Section 5.4.2) of type
// `PROTOCOL_ERROR`. // `PROTOCOL_ERROR`.
return Err(Stream { return Err(Error::library_reset($head.stream_id(), Reason::PROTOCOL_ERROR));
id: $head.stream_id(),
reason: Reason::PROTOCOL_ERROR,
});
}, },
Err(e) => { Err(e) => {
proto_err!(conn: "failed to load frame; err={:?}", e); proto_err!(conn: "failed to load frame; err={:?}", e);
return Err(Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
}; };
@@ -151,14 +147,11 @@ fn decode_frame(
Err(frame::Error::MalformedMessage) => { Err(frame::Error::MalformedMessage) => {
let id = $head.stream_id(); let id = $head.stream_id();
proto_err!(stream: "malformed header block; stream={:?}", id); proto_err!(stream: "malformed header block; stream={:?}", id);
return Err(Stream { return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR));
id,
reason: Reason::PROTOCOL_ERROR,
});
}, },
Err(e) => { Err(e) => {
proto_err!(conn: "failed HPACK decoding; err={:?}", e); proto_err!(conn: "failed HPACK decoding; err={:?}", e);
return Err(Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
} }
@@ -183,7 +176,7 @@ fn decode_frame(
res.map_err(|e| { res.map_err(|e| {
proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e); proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR) Error::library_go_away(Reason::PROTOCOL_ERROR)
})? })?
.into() .into()
} }
@@ -192,7 +185,7 @@ fn decode_frame(
res.map_err(|e| { res.map_err(|e| {
proto_err!(conn: "failed to load PING frame; err={:?}", e); proto_err!(conn: "failed to load PING frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR) Error::library_go_away(Reason::PROTOCOL_ERROR)
})? })?
.into() .into()
} }
@@ -201,7 +194,7 @@ fn decode_frame(
res.map_err(|e| { res.map_err(|e| {
proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e); proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR) Error::library_go_away(Reason::PROTOCOL_ERROR)
})? })?
.into() .into()
} }
@@ -212,7 +205,7 @@ fn decode_frame(
// TODO: Should this always be connection level? Probably not... // TODO: Should this always be connection level? Probably not...
res.map_err(|e| { res.map_err(|e| {
proto_err!(conn: "failed to load DATA frame; err={:?}", e); proto_err!(conn: "failed to load DATA frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR) Error::library_go_away(Reason::PROTOCOL_ERROR)
})? })?
.into() .into()
} }
@@ -221,7 +214,7 @@ fn decode_frame(
let res = frame::Reset::load(head, &bytes[frame::HEADER_LEN..]); let res = frame::Reset::load(head, &bytes[frame::HEADER_LEN..]);
res.map_err(|e| { res.map_err(|e| {
proto_err!(conn: "failed to load RESET frame; err={:?}", e); proto_err!(conn: "failed to load RESET frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR) Error::library_go_away(Reason::PROTOCOL_ERROR)
})? })?
.into() .into()
} }
@@ -229,7 +222,7 @@ fn decode_frame(
let res = frame::GoAway::load(&bytes[frame::HEADER_LEN..]); let res = frame::GoAway::load(&bytes[frame::HEADER_LEN..]);
res.map_err(|e| { res.map_err(|e| {
proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e); proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR) Error::library_go_away(Reason::PROTOCOL_ERROR)
})? })?
.into() .into()
} }
@@ -238,7 +231,7 @@ fn decode_frame(
if head.stream_id() == 0 { if head.stream_id() == 0 {
// Invalid stream identifier // Invalid stream identifier
proto_err!(conn: "invalid stream ID 0"); proto_err!(conn: "invalid stream ID 0");
return Err(Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
} }
match frame::Priority::load(head, &bytes[frame::HEADER_LEN..]) { match frame::Priority::load(head, &bytes[frame::HEADER_LEN..]) {
@@ -249,14 +242,11 @@ fn decode_frame(
// `PROTOCOL_ERROR`. // `PROTOCOL_ERROR`.
let id = head.stream_id(); let id = head.stream_id();
proto_err!(stream: "PRIORITY invalid dependency ID; stream={:?}", id); proto_err!(stream: "PRIORITY invalid dependency ID; stream={:?}", id);
return Err(Stream { return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR));
id,
reason: Reason::PROTOCOL_ERROR,
});
} }
Err(e) => { Err(e) => {
proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e); proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e);
return Err(Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
} }
} }
@@ -267,14 +257,14 @@ fn decode_frame(
Some(partial) => partial, Some(partial) => partial,
None => { None => {
proto_err!(conn: "received unexpected CONTINUATION frame"); proto_err!(conn: "received unexpected CONTINUATION frame");
return Err(Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
} }
}; };
// The stream identifiers must match // The stream identifiers must match
if partial.frame.stream_id() != head.stream_id() { if partial.frame.stream_id() != head.stream_id() {
proto_err!(conn: "CONTINUATION frame stream ID does not match previous frame stream ID"); proto_err!(conn: "CONTINUATION frame stream ID does not match previous frame stream ID");
return Err(Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
} }
// Extend the buf // Extend the buf
@@ -297,7 +287,7 @@ fn decode_frame(
// the attacker to go away. // the attacker to go away.
if partial.buf.len() + bytes.len() > max_header_list_size { if partial.buf.len() + bytes.len() > max_header_list_size {
proto_err!(conn: "CONTINUATION frame header block size over ignorable limit"); proto_err!(conn: "CONTINUATION frame header block size over ignorable limit");
return Err(Connection(Reason::COMPRESSION_ERROR)); return Err(Error::library_go_away(Reason::COMPRESSION_ERROR).into());
} }
} }
partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]); partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]);
@@ -312,14 +302,11 @@ fn decode_frame(
Err(frame::Error::MalformedMessage) => { Err(frame::Error::MalformedMessage) => {
let id = head.stream_id(); let id = head.stream_id();
proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id); proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id);
return Err(Stream { return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR));
id,
reason: Reason::PROTOCOL_ERROR,
});
} }
Err(e) => { Err(e) => {
proto_err!(conn: "failed HPACK decoding; err={:?}", e); proto_err!(conn: "failed HPACK decoding; err={:?}", e);
return Err(Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
} }
@@ -343,7 +330,7 @@ impl<T> Stream for FramedRead<T>
where where
T: AsyncRead + Unpin, T: AsyncRead + Unpin,
{ {
type Item = Result<Frame, RecvError>; type Item = Result<Frame, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let span = tracing::trace_span!("FramedRead::poll_next"); let span = tracing::trace_span!("FramedRead::poll_next");
@@ -371,11 +358,11 @@ where
} }
} }
fn map_err(err: io::Error) -> RecvError { fn map_err(err: io::Error) -> Error {
if let io::ErrorKind::InvalidData = err.kind() { if let io::ErrorKind::InvalidData = err.kind() {
if let Some(custom) = err.get_ref() { if let Some(custom) = err.get_ref() {
if custom.is::<LengthDelimitedCodecError>() { if custom.is::<LengthDelimitedCodecError>() {
return RecvError::Connection(Reason::FRAME_SIZE_ERROR); return Error::library_go_away(Reason::FRAME_SIZE_ERROR);
} }
} }
} }

View File

@@ -2,12 +2,13 @@ mod error;
mod framed_read; mod framed_read;
mod framed_write; mod framed_write;
pub use self::error::{RecvError, SendError, UserError}; pub use self::error::{SendError, UserError};
use self::framed_read::FramedRead; use self::framed_read::FramedRead;
use self::framed_write::FramedWrite; use self::framed_write::FramedWrite;
use crate::frame::{self, Data, Frame}; use crate::frame::{self, Data, Frame};
use crate::proto::Error;
use bytes::Buf; use bytes::Buf;
use futures_core::Stream; use futures_core::Stream;
@@ -155,7 +156,7 @@ impl<T, B> Stream for Codec<T, B>
where where
T: AsyncRead + Unpin, T: AsyncRead + Unpin,
{ {
type Item = Result<Frame, RecvError>; type Item = Result<Frame, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx) Pin::new(&mut self.inner).poll_next(cx)

View File

@@ -1,6 +1,8 @@
use crate::codec::{SendError, UserError}; use crate::codec::{SendError, UserError};
use crate::proto; use crate::frame::StreamId;
use crate::proto::{self, Initiator};
use bytes::Bytes;
use std::{error, fmt, io}; use std::{error, fmt, io};
pub use crate::frame::Reason; pub use crate::frame::Reason;
@@ -22,11 +24,14 @@ pub struct Error {
#[derive(Debug)] #[derive(Debug)]
enum Kind { enum Kind {
/// An error caused by an action taken by the remote peer. /// A RST_STREAM frame was received or sent.
/// Reset(StreamId, Reason, Initiator),
/// This is either an error received by the peer or caused by an invalid
/// action taken by the peer (i.e. a protocol error). /// A GO_AWAY frame was received or sent.
Proto(Reason), GoAway(Bytes, Reason, Initiator),
/// The user created an error from a bare Reason.
Reason(Reason),
/// An error resulting from an invalid action taken by the user of this /// An error resulting from an invalid action taken by the user of this
/// library. /// library.
@@ -45,7 +50,7 @@ impl Error {
/// action taken by the peer (i.e. a protocol error). /// action taken by the peer (i.e. a protocol error).
pub fn reason(&self) -> Option<Reason> { pub fn reason(&self) -> Option<Reason> {
match self.kind { match self.kind {
Kind::Proto(reason) => Some(reason), Kind::Reset(_, reason, _) | Kind::GoAway(_, reason, _) => Some(reason),
_ => None, _ => None,
} }
} }
@@ -87,8 +92,13 @@ impl From<proto::Error> for Error {
Error { Error {
kind: match src { kind: match src {
Proto(reason) => Kind::Proto(reason), Reset(stream_id, reason, initiator) => Kind::Reset(stream_id, reason, initiator),
Io(e) => Kind::Io(e), GoAway(debug_data, reason, initiator) => {
Kind::GoAway(debug_data, reason, initiator)
}
Io(kind, inner) => {
Kind::Io(inner.map_or_else(|| kind.into(), |inner| io::Error::new(kind, inner)))
}
}, },
} }
} }
@@ -97,7 +107,7 @@ impl From<proto::Error> for Error {
impl From<Reason> for Error { impl From<Reason> for Error {
fn from(src: Reason) -> Error { fn from(src: Reason) -> Error {
Error { Error {
kind: Kind::Proto(src), kind: Kind::Reason(src),
} }
} }
} }
@@ -106,8 +116,7 @@ impl From<SendError> for Error {
fn from(src: SendError) -> Error { fn from(src: SendError) -> Error {
match src { match src {
SendError::User(e) => e.into(), SendError::User(e) => e.into(),
SendError::Connection(reason) => reason.into(), SendError::Connection(e) => e.into(),
SendError::Io(e) => Error::from_io(e),
} }
} }
} }
@@ -122,13 +131,38 @@ impl From<UserError> for Error {
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use self::Kind::*; let debug_data = match self.kind {
Kind::Reset(_, reason, Initiator::User) => {
match self.kind { return write!(fmt, "stream error sent by user: {}", reason)
Proto(ref reason) => write!(fmt, "protocol error: {}", reason),
User(ref e) => write!(fmt, "user error: {}", e),
Io(ref e) => fmt::Display::fmt(e, fmt),
} }
Kind::Reset(_, reason, Initiator::Library) => {
return write!(fmt, "stream error detected: {}", reason)
}
Kind::Reset(_, reason, Initiator::Remote) => {
return write!(fmt, "stream error received: {}", reason)
}
Kind::GoAway(ref debug_data, reason, Initiator::User) => {
write!(fmt, "connection error sent by user: {}", reason)?;
debug_data
}
Kind::GoAway(ref debug_data, reason, Initiator::Library) => {
write!(fmt, "connection error detected: {}", reason)?;
debug_data
}
Kind::GoAway(ref debug_data, reason, Initiator::Remote) => {
write!(fmt, "connection error received: {}", reason)?;
debug_data
}
Kind::Reason(reason) => return write!(fmt, "protocol error: {}", reason),
Kind::User(ref e) => return write!(fmt, "user error: {}", e),
Kind::Io(ref e) => return e.fmt(fmt),
};
if !debug_data.is_empty() {
write!(fmt, " ({:?})", debug_data)?;
}
Ok(())
} }
} }

View File

@@ -29,8 +29,7 @@ impl GoAway {
self.error_code self.error_code
} }
#[cfg(feature = "unstable")] pub fn debug_data(&self) -> &Bytes {
pub fn debug_data(&self) -> &[u8] {
&self.debug_data &self.debug_data
} }

View File

@@ -2,7 +2,7 @@ use crate::frame::{self, Error, Head, Kind, Reason, StreamId};
use bytes::BufMut; use bytes::BufMut;
#[derive(Debug, Eq, PartialEq)] #[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Reset { pub struct Reset {
stream_id: StreamId, stream_id: StreamId,
error_code: Reason, error_code: Reason,

View File

@@ -104,8 +104,14 @@ macro_rules! ready {
mod codec; mod codec;
mod error; mod error;
mod hpack; mod hpack;
#[cfg(not(feature = "unstable"))]
mod proto; mod proto;
#[cfg(feature = "unstable")]
#[allow(missing_docs)]
pub mod proto;
#[cfg(not(feature = "unstable"))] #[cfg(not(feature = "unstable"))]
mod frame; mod frame;
@@ -125,7 +131,7 @@ pub use crate::error::{Error, Reason};
pub use crate::share::{FlowControl, Ping, PingPong, Pong, RecvStream, SendStream, StreamId}; pub use crate::share::{FlowControl, Ping, PingPong, Pong, RecvStream, SendStream, StreamId};
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
pub use codec::{Codec, RecvError, SendError, UserError}; pub use codec::{Codec, SendError, UserError};
use std::task::Poll; use std::task::Poll;

View File

@@ -1,6 +1,6 @@
use crate::codec::{RecvError, UserError}; use crate::codec::UserError;
use crate::frame::{Reason, StreamId}; use crate::frame::{Reason, StreamId};
use crate::{client, frame, proto, server}; use crate::{client, frame, server};
use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE; use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE;
use crate::proto::*; use crate::proto::*;
@@ -40,7 +40,7 @@ where
/// ///
/// This exists separately from State in order to support /// This exists separately from State in order to support
/// graceful shutdown. /// graceful shutdown.
error: Option<Reason>, error: Option<frame::GoAway>,
/// Pending GOAWAY frames to write. /// Pending GOAWAY frames to write.
go_away: GoAway, go_away: GoAway,
@@ -68,7 +68,7 @@ struct DynConnection<'a, B: Buf = Bytes> {
streams: DynStreams<'a, B>, streams: DynStreams<'a, B>,
error: &'a mut Option<Reason>, error: &'a mut Option<frame::GoAway>,
ping_pong: &'a mut PingPong, ping_pong: &'a mut PingPong,
} }
@@ -88,10 +88,10 @@ enum State {
Open, Open,
/// The codec must be flushed /// The codec must be flushed
Closing(Reason), Closing(Reason, Initiator),
/// In a closed state /// In a closed state
Closed(Reason), Closed(Reason, Initiator),
} }
impl<T, P, B> Connection<T, P, B> impl<T, P, B> Connection<T, P, B>
@@ -161,9 +161,9 @@ where
/// Returns `Ready` when the connection is ready to receive a frame. /// Returns `Ready` when the connection is ready to receive a frame.
/// ///
/// Returns `RecvError` as this may raise errors that are caused by delayed /// Returns `Error` as this may raise errors that are caused by delayed
/// processing of received frames. /// processing of received frames.
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), RecvError>> { fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
let _e = self.inner.span.enter(); let _e = self.inner.span.enter();
let span = tracing::trace_span!("poll_ready"); let span = tracing::trace_span!("poll_ready");
let _e = span.enter(); let _e = span.enter();
@@ -191,26 +191,24 @@ where
self.inner.as_dyn().go_away_from_user(e) self.inner.as_dyn().go_away_from_user(e)
} }
fn take_error(&mut self, ours: Reason) -> Poll<Result<(), proto::Error>> { fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> {
let reason = if let Some(theirs) = self.inner.error.take() { let (debug_data, theirs) = self
.inner
.error
.take()
.as_ref()
.map_or((Bytes::new(), Reason::NO_ERROR), |frame| {
(frame.debug_data().clone(), frame.reason())
});
match (ours, theirs) { match (ours, theirs) {
// If either side reported an error, return that (Reason::NO_ERROR, Reason::NO_ERROR) => return Ok(()),
// to the user. (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)),
(Reason::NO_ERROR, err) | (err, Reason::NO_ERROR) => err,
// If both sides reported an error, give their // If both sides reported an error, give their
// error back to th user. We assume our error // error back to th user. We assume our error
// was a consequence of their error, and less // was a consequence of their error, and less
// important. // important.
(_, theirs) => theirs, (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)),
}
} else {
ours
};
if reason == Reason::NO_ERROR {
Poll::Ready(Ok(()))
} else {
Poll::Ready(Err(proto::Error::Proto(reason)))
} }
} }
@@ -229,7 +227,7 @@ where
} }
/// Advances the internal state of the connection. /// Advances the internal state of the connection.
pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), proto::Error>> { pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
// XXX(eliza): cloning the span is unfortunately necessary here in // XXX(eliza): cloning the span is unfortunately necessary here in
// order to placate the borrow checker — `self` is mutably borrowed by // order to placate the borrow checker — `self` is mutably borrowed by
// `poll2`, which means that we can't borrow `self.span` to enter it. // `poll2`, which means that we can't borrow `self.span` to enter it.
@@ -268,20 +266,22 @@ where
self.inner.as_dyn().handle_poll2_result(result)? self.inner.as_dyn().handle_poll2_result(result)?
} }
State::Closing(reason) => { State::Closing(reason, initiator) => {
tracing::trace!("connection closing after flush"); tracing::trace!("connection closing after flush");
// Flush/shutdown the codec // Flush/shutdown the codec
ready!(self.codec.shutdown(cx))?; ready!(self.codec.shutdown(cx))?;
// Transition the state to error // Transition the state to error
self.inner.state = State::Closed(reason); self.inner.state = State::Closed(reason, initiator);
}
State::Closed(reason, initiator) => {
return Poll::Ready(self.take_error(reason, initiator));
} }
State::Closed(reason) => return self.take_error(reason),
} }
} }
} }
fn poll2(&mut self, cx: &mut Context) -> Poll<Result<(), RecvError>> { fn poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
// This happens outside of the loop to prevent needing to do a clock // This happens outside of the loop to prevent needing to do a clock
// check and then comparison of the queue possibly multiple times a // check and then comparison of the queue possibly multiple times a
// second (and thus, the clock wouldn't have changed enough to matter). // second (and thus, the clock wouldn't have changed enough to matter).
@@ -300,7 +300,7 @@ where
// the same error back to the user. // the same error back to the user.
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} else { } else {
return Poll::Ready(Err(RecvError::Connection(reason))); return Poll::Ready(Err(Error::library_go_away(reason)));
} }
} }
// Only NO_ERROR should be waiting for idle // Only NO_ERROR should be waiting for idle
@@ -384,42 +384,45 @@ where
self.go_away.go_away_from_user(frame); self.go_away.go_away_from_user(frame);
// Notify all streams of reason we're abruptly closing. // Notify all streams of reason we're abruptly closing.
self.streams.recv_err(&proto::Error::Proto(e)); self.streams.handle_error(Error::user_go_away(e));
} }
fn handle_poll2_result(&mut self, result: Result<(), RecvError>) -> Result<(), Error> { fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> {
use crate::codec::RecvError::*;
match result { match result {
// The connection has shutdown normally // The connection has shutdown normally
Ok(()) => { Ok(()) => {
*self.state = State::Closing(Reason::NO_ERROR); *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library);
Ok(()) Ok(())
} }
// Attempting to read a frame resulted in a connection level // Attempting to read a frame resulted in a connection level
// error. This is handled by setting a GOAWAY frame followed by // error. This is handled by setting a GOAWAY frame followed by
// terminating the connection. // terminating the connection.
Err(Connection(e)) => { Err(Error::GoAway(debug_data, reason, initiator)) => {
let e = Error::GoAway(debug_data, reason, initiator);
tracing::debug!(error = ?e, "Connection::poll; connection error"); tracing::debug!(error = ?e, "Connection::poll; connection error");
// We may have already sent a GOAWAY for this error, // We may have already sent a GOAWAY for this error,
// if so, don't send another, just flush and close up. // if so, don't send another, just flush and close up.
if let Some(reason) = self.go_away.going_away_reason() { if self
if reason == e { .go_away
.going_away()
.map_or(false, |frame| frame.reason() == reason)
{
tracing::trace!(" -> already going away"); tracing::trace!(" -> already going away");
*self.state = State::Closing(e); *self.state = State::Closing(reason, initiator);
return Ok(()); return Ok(());
} }
}
// Reset all active streams // Reset all active streams
self.streams.recv_err(&e.into()); self.streams.handle_error(e);
self.go_away_now(e); self.go_away_now(reason);
Ok(()) Ok(())
} }
// Attempting to read a frame resulted in a stream level error. // Attempting to read a frame resulted in a stream level error.
// This is handled by resetting the frame then trying to read // This is handled by resetting the frame then trying to read
// another frame. // another frame.
Err(Stream { id, reason }) => { Err(Error::Reset(id, reason, initiator)) => {
debug_assert_eq!(initiator, Initiator::Library);
tracing::trace!(?id, ?reason, "stream error"); tracing::trace!(?id, ?reason, "stream error");
self.streams.send_reset(id, reason); self.streams.send_reset(id, reason);
Ok(()) Ok(())
@@ -428,12 +431,12 @@ where
// active streams must be reset. // active streams must be reset.
// //
// TODO: Are I/O errors recoverable? // TODO: Are I/O errors recoverable?
Err(Io(e)) => { Err(Error::Io(e, inner)) => {
tracing::debug!(error = ?e, "Connection::poll; IO error"); tracing::debug!(error = ?e, "Connection::poll; IO error");
let e = e.into(); let e = Error::Io(e, inner);
// Reset all active streams // Reset all active streams
self.streams.recv_err(&e); self.streams.handle_error(e.clone());
// Return the error // Return the error
Err(e) Err(e)
@@ -441,7 +444,7 @@ where
} }
} }
fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, RecvError> { fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> {
use crate::frame::Frame::*; use crate::frame::Frame::*;
match frame { match frame {
Some(Headers(frame)) => { Some(Headers(frame)) => {
@@ -471,7 +474,7 @@ where
// until they are all EOS. Once they are, State should // until they are all EOS. Once they are, State should
// transition to GoAway. // transition to GoAway.
self.streams.recv_go_away(&frame)?; self.streams.recv_go_away(&frame)?;
*self.error = Some(frame.reason()); *self.error = Some(frame);
} }
Some(Ping(frame)) => { Some(Ping(frame)) => {
tracing::trace!(?frame, "recv PING"); tracing::trace!(?frame, "recv PING");

View File

@@ -1,53 +1,87 @@
use crate::codec::{RecvError, SendError}; use crate::codec::SendError;
use crate::frame::Reason; use crate::frame::{Reason, StreamId};
use bytes::Bytes;
use std::fmt;
use std::io; use std::io;
/// Either an H2 reason or an I/O error /// Either an H2 reason or an I/O error
#[derive(Debug)] #[derive(Clone, Debug)]
pub enum Error { pub enum Error {
Proto(Reason), Reset(StreamId, Reason, Initiator),
Io(io::Error), GoAway(Bytes, Reason, Initiator),
Io(io::ErrorKind, Option<String>),
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Initiator {
User,
Library,
Remote,
} }
impl Error { impl Error {
/// Clone the error for internal purposes. pub(crate) fn is_local(&self) -> bool {
///
/// `io::Error` is not `Clone`, so we only copy the `ErrorKind`.
pub(super) fn shallow_clone(&self) -> Error {
match *self { match *self {
Error::Proto(reason) => Error::Proto(reason), Self::Reset(_, _, initiator) | Self::GoAway(_, _, initiator) => initiator.is_local(),
Error::Io(ref io) => Error::Io(io::Error::from(io.kind())), Self::Io(..) => true,
}
}
pub(crate) fn user_go_away(reason: Reason) -> Self {
Self::GoAway(Bytes::new(), reason, Initiator::User)
}
pub(crate) fn library_reset(stream_id: StreamId, reason: Reason) -> Self {
Self::Reset(stream_id, reason, Initiator::Library)
}
pub(crate) fn library_go_away(reason: Reason) -> Self {
Self::GoAway(Bytes::new(), reason, Initiator::Library)
}
pub(crate) fn remote_reset(stream_id: StreamId, reason: Reason) -> Self {
Self::Reset(stream_id, reason, Initiator::Remote)
}
pub(crate) fn remote_go_away(debug_data: Bytes, reason: Reason) -> Self {
Self::GoAway(debug_data, reason, Initiator::Remote)
}
}
impl Initiator {
fn is_local(&self) -> bool {
match *self {
Self::User | Self::Library => true,
Self::Remote => false,
} }
} }
} }
impl From<Reason> for Error { impl fmt::Display for Error {
fn from(src: Reason) -> Self { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
Error::Proto(src) match *self {
Self::Reset(_, reason, _) | Self::GoAway(_, reason, _) => reason.fmt(fmt),
Self::Io(_, Some(ref inner)) => inner.fmt(fmt),
Self::Io(kind, None) => io::Error::from(kind).fmt(fmt),
}
}
}
impl From<io::ErrorKind> for Error {
fn from(src: io::ErrorKind) -> Self {
Error::Io(src.into(), None)
} }
} }
impl From<io::Error> for Error { impl From<io::Error> for Error {
fn from(src: io::Error) -> Self { fn from(src: io::Error) -> Self {
Error::Io(src) Error::Io(src.kind(), src.get_ref().map(|inner| inner.to_string()))
}
}
impl From<Error> for RecvError {
fn from(src: Error) -> RecvError {
match src {
Error::Proto(reason) => RecvError::Connection(reason),
Error::Io(e) => RecvError::Io(e),
}
} }
} }
impl From<Error> for SendError { impl From<Error> for SendError {
fn from(src: Error) -> SendError { fn from(src: Error) -> Self {
match src { Self::Connection(src)
Error::Proto(reason) => SendError::Connection(reason),
Error::Io(e) => SendError::Io(e),
}
} }
} }

View File

@@ -31,7 +31,7 @@ pub(super) struct GoAway {
/// well, and we wouldn't want to save that here to accidentally dump in logs, /// well, and we wouldn't want to save that here to accidentally dump in logs,
/// or waste struct space.) /// or waste struct space.)
#[derive(Debug)] #[derive(Debug)]
struct GoingAway { pub(crate) struct GoingAway {
/// Stores the highest stream ID of a GOAWAY that has been sent. /// Stores the highest stream ID of a GOAWAY that has been sent.
/// ///
/// It's illegal to send a subsequent GOAWAY with a higher ID. /// It's illegal to send a subsequent GOAWAY with a higher ID.
@@ -98,9 +98,9 @@ impl GoAway {
self.is_user_initiated self.is_user_initiated
} }
/// Return the last Reason we've sent. /// Returns the going away info, if any.
pub fn going_away_reason(&self) -> Option<Reason> { pub fn going_away(&self) -> Option<&GoingAway> {
self.going_away.as_ref().map(|g| g.reason) self.going_away.as_ref()
} }
/// Returns if the connection should close now, or wait until idle. /// Returns if the connection should close now, or wait until idle.
@@ -141,7 +141,7 @@ impl GoAway {
return Poll::Ready(Some(Ok(reason))); return Poll::Ready(Some(Ok(reason)));
} else if self.should_close_now() { } else if self.should_close_now() {
return match self.going_away_reason() { return match self.going_away().map(|going_away| going_away.reason) {
Some(reason) => Poll::Ready(Some(Ok(reason))), Some(reason) => Poll::Ready(Some(Ok(reason))),
None => Poll::Ready(None), None => Poll::Ready(None),
}; };
@@ -150,3 +150,9 @@ impl GoAway {
Poll::Ready(None) Poll::Ready(None)
} }
} }
impl GoingAway {
pub(crate) fn reason(&self) -> Reason {
self.reason
}
}

View File

@@ -7,7 +7,7 @@ mod settings;
mod streams; mod streams;
pub(crate) use self::connection::{Config, Connection}; pub(crate) use self::connection::{Config, Connection};
pub(crate) use self::error::Error; pub use self::error::{Error, Initiator};
pub(crate) use self::peer::{Dyn as DynPeer, Peer}; pub(crate) use self::peer::{Dyn as DynPeer, Peer};
pub(crate) use self::ping_pong::UserPings; pub(crate) use self::ping_pong::UserPings;
pub(crate) use self::streams::{DynStreams, OpaqueStreamRef, StreamRef, Streams}; pub(crate) use self::streams::{DynStreams, OpaqueStreamRef, StreamRef, Streams};

View File

@@ -1,7 +1,6 @@
use crate::codec::RecvError;
use crate::error::Reason; use crate::error::Reason;
use crate::frame::{Pseudo, StreamId}; use crate::frame::{Pseudo, StreamId};
use crate::proto::Open; use crate::proto::{Error, Open};
use http::{HeaderMap, Request, Response}; use http::{HeaderMap, Request, Response};
@@ -21,7 +20,7 @@ pub(crate) trait Peer {
pseudo: Pseudo, pseudo: Pseudo,
fields: HeaderMap, fields: HeaderMap,
stream_id: StreamId, stream_id: StreamId,
) -> Result<Self::Poll, RecvError>; ) -> Result<Self::Poll, Error>;
fn is_local_init(id: StreamId) -> bool { fn is_local_init(id: StreamId) -> bool {
assert!(!id.is_zero()); assert!(!id.is_zero());
@@ -61,7 +60,7 @@ impl Dyn {
pseudo: Pseudo, pseudo: Pseudo,
fields: HeaderMap, fields: HeaderMap,
stream_id: StreamId, stream_id: StreamId,
) -> Result<PollMessage, RecvError> { ) -> Result<PollMessage, Error> {
if self.is_server() { if self.is_server() {
crate::server::Peer::convert_poll_message(pseudo, fields, stream_id) crate::server::Peer::convert_poll_message(pseudo, fields, stream_id)
.map(PollMessage::Server) .map(PollMessage::Server)
@@ -72,12 +71,12 @@ impl Dyn {
} }
/// Returns true if the remote peer can initiate a stream with the given ID. /// Returns true if the remote peer can initiate a stream with the given ID.
pub fn ensure_can_open(&self, id: StreamId, mode: Open) -> Result<(), RecvError> { pub fn ensure_can_open(&self, id: StreamId, mode: Open) -> Result<(), Error> {
if self.is_server() { if self.is_server() {
// Ensure that the ID is a valid client initiated ID // Ensure that the ID is a valid client initiated ID
if mode.is_push_promise() || !id.is_client_initiated() { if mode.is_push_promise() || !id.is_client_initiated() {
proto_err!(conn: "cannot open stream {:?} - not client initiated", id); proto_err!(conn: "cannot open stream {:?} - not client initiated", id);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
Ok(()) Ok(())
@@ -85,7 +84,7 @@ impl Dyn {
// Ensure that the ID is a valid server initiated ID // Ensure that the ID is a valid server initiated ID
if !mode.is_push_promise() || !id.is_server_initiated() { if !mode.is_push_promise() || !id.is_server_initiated() {
proto_err!(conn: "cannot open stream {:?} - not server initiated", id); proto_err!(conn: "cannot open stream {:?} - not server initiated", id);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
Ok(()) Ok(())

View File

@@ -1,4 +1,4 @@
use crate::codec::{RecvError, UserError}; use crate::codec::UserError;
use crate::error::Reason; use crate::error::Reason;
use crate::frame; use crate::frame;
use crate::proto::*; use crate::proto::*;
@@ -40,7 +40,7 @@ impl Settings {
frame: frame::Settings, frame: frame::Settings,
codec: &mut Codec<T, B>, codec: &mut Codec<T, B>,
streams: &mut Streams<C, P>, streams: &mut Streams<C, P>,
) -> Result<(), RecvError> ) -> Result<(), Error>
where where
T: AsyncWrite + Unpin, T: AsyncWrite + Unpin,
B: Buf, B: Buf,
@@ -68,7 +68,7 @@ impl Settings {
// We haven't sent any SETTINGS frames to be ACKed, so // We haven't sent any SETTINGS frames to be ACKed, so
// this is very bizarre! Remote is either buggy or malicious. // this is very bizarre! Remote is either buggy or malicious.
proto_err!(conn: "received unexpected settings ack"); proto_err!(conn: "received unexpected settings ack");
Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
} }
} }
} else { } else {
@@ -97,7 +97,7 @@ impl Settings {
cx: &mut Context, cx: &mut Context,
dst: &mut Codec<T, B>, dst: &mut Codec<T, B>,
streams: &mut Streams<C, P>, streams: &mut Streams<C, P>,
) -> Poll<Result<(), RecvError>> ) -> Poll<Result<(), Error>>
where where
T: AsyncWrite + Unpin, T: AsyncWrite + Unpin,
B: Buf, B: Buf,

View File

@@ -791,7 +791,10 @@ impl Prioritize {
}), }),
None => { None => {
if let Some(reason) = stream.state.get_scheduled_reset() { if let Some(reason) = stream.state.get_scheduled_reset() {
stream.state.set_reset(reason); let stream_id = stream.id;
stream
.state
.set_reset(stream_id, reason, Initiator::Library);
let frame = frame::Reset::new(stream.id, reason); let frame = frame::Reset::new(stream.id, reason);
Frame::Reset(frame) Frame::Reset(frame)

View File

@@ -1,7 +1,7 @@
use super::*; use super::*;
use crate::codec::{RecvError, UserError}; use crate::codec::UserError;
use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; use crate::frame::{self, PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
use crate::{frame, proto}; use crate::proto::{self, Error};
use std::task::Context; use std::task::Context;
use http::{HeaderMap, Request, Response}; use http::{HeaderMap, Request, Response};
@@ -68,7 +68,7 @@ pub(super) enum Event {
#[derive(Debug)] #[derive(Debug)]
pub(super) enum RecvHeaderBlockError<T> { pub(super) enum RecvHeaderBlockError<T> {
Oversize(T), Oversize(T),
State(RecvError), State(Error),
} }
#[derive(Debug)] #[derive(Debug)]
@@ -124,7 +124,7 @@ impl Recv {
id: StreamId, id: StreamId,
mode: Open, mode: Open,
counts: &mut Counts, counts: &mut Counts,
) -> Result<Option<StreamId>, RecvError> { ) -> Result<Option<StreamId>, Error> {
assert!(self.refused.is_none()); assert!(self.refused.is_none());
counts.peer().ensure_can_open(id, mode)?; counts.peer().ensure_can_open(id, mode)?;
@@ -132,7 +132,7 @@ impl Recv {
let next_id = self.next_stream_id()?; let next_id = self.next_stream_id()?;
if id < next_id { if id < next_id {
proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id); proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
self.next_stream_id = id.next_id(); self.next_stream_id = id.next_id();
@@ -176,11 +176,7 @@ impl Recv {
Ok(v) => v, Ok(v) => v,
Err(()) => { Err(()) => {
proto_err!(stream: "could not parse content-length; stream={:?}", stream.id); proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
return Err(RecvError::Stream { return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
id: stream.id,
reason: Reason::PROTOCOL_ERROR,
}
.into());
} }
}; };
@@ -312,16 +308,13 @@ impl Recv {
&mut self, &mut self,
frame: frame::Headers, frame: frame::Headers,
stream: &mut store::Ptr, stream: &mut store::Ptr,
) -> Result<(), RecvError> { ) -> Result<(), Error> {
// Transition the state // Transition the state
stream.state.recv_close()?; stream.state.recv_close()?;
if stream.ensure_content_length_zero().is_err() { if stream.ensure_content_length_zero().is_err() {
proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id); proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id);
return Err(RecvError::Stream { return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
id: stream.id,
reason: Reason::PROTOCOL_ERROR,
});
} }
let trailers = frame.into_fields(); let trailers = frame.into_fields();
@@ -455,7 +448,7 @@ impl Recv {
&mut self, &mut self,
settings: &frame::Settings, settings: &frame::Settings,
store: &mut Store, store: &mut Store,
) -> Result<(), RecvError> { ) -> Result<(), proto::Error> {
let target = if let Some(val) = settings.initial_window_size() { let target = if let Some(val) = settings.initial_window_size() {
val val
} else { } else {
@@ -502,7 +495,7 @@ impl Recv {
stream stream
.recv_flow .recv_flow
.inc_window(inc) .inc_window(inc)
.map_err(RecvError::Connection)?; .map_err(proto::Error::library_go_away)?;
stream.recv_flow.assign_capacity(inc); stream.recv_flow.assign_capacity(inc);
Ok(()) Ok(())
}) })
@@ -520,11 +513,7 @@ impl Recv {
stream.pending_recv.is_empty() stream.pending_recv.is_empty()
} }
pub fn recv_data( pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
&mut self,
frame: frame::Data,
stream: &mut store::Ptr,
) -> Result<(), RecvError> {
let sz = frame.payload().len(); let sz = frame.payload().len();
// This should have been enforced at the codec::FramedRead layer, so // This should have been enforced at the codec::FramedRead layer, so
@@ -542,7 +531,7 @@ impl Recv {
// Receiving a DATA frame when not expecting one is a protocol // Receiving a DATA frame when not expecting one is a protocol
// error. // error.
proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id); proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
tracing::trace!( tracing::trace!(
@@ -557,7 +546,7 @@ impl Recv {
"recv_data; frame ignored on locally reset {:?} for some time", "recv_data; frame ignored on locally reset {:?} for some time",
stream.id, stream.id,
); );
return self.ignore_data(sz); return Ok(self.ignore_data(sz)?);
} }
// Ensure that there is enough capacity on the connection before acting // Ensure that there is enough capacity on the connection before acting
@@ -573,10 +562,7 @@ impl Recv {
// So, for violating the **stream** window, we can send either a // So, for violating the **stream** window, we can send either a
// stream or connection error. We've opted to send a stream // stream or connection error. We've opted to send a stream
// error. // error.
return Err(RecvError::Stream { return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
id: stream.id,
reason: Reason::FLOW_CONTROL_ERROR,
});
} }
if stream.dec_content_length(frame.payload().len()).is_err() { if stream.dec_content_length(frame.payload().len()).is_err() {
@@ -585,10 +571,7 @@ impl Recv {
stream.id, stream.id,
frame.payload().len(), frame.payload().len(),
); );
return Err(RecvError::Stream { return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
id: stream.id,
reason: Reason::PROTOCOL_ERROR,
});
} }
if frame.is_end_stream() { if frame.is_end_stream() {
@@ -598,15 +581,12 @@ impl Recv {
stream.id, stream.id,
frame.payload().len(), frame.payload().len(),
); );
return Err(RecvError::Stream { return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
id: stream.id,
reason: Reason::PROTOCOL_ERROR,
});
} }
if stream.state.recv_close().is_err() { if stream.state.recv_close().is_err() {
proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id); proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
} }
} }
@@ -625,7 +605,7 @@ impl Recv {
Ok(()) Ok(())
} }
pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), RecvError> { pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
// Ensure that there is enough capacity on the connection... // Ensure that there is enough capacity on the connection...
self.consume_connection_window(sz)?; self.consume_connection_window(sz)?;
@@ -641,14 +621,14 @@ impl Recv {
Ok(()) Ok(())
} }
pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> { pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
if self.flow.window_size() < sz { if self.flow.window_size() < sz {
tracing::debug!( tracing::debug!(
"connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});", "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
self.flow.window_size(), self.flow.window_size(),
sz, sz,
); );
return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR)); return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
} }
// Update connection level flow control // Update connection level flow control
@@ -663,7 +643,7 @@ impl Recv {
&mut self, &mut self,
frame: frame::PushPromise, frame: frame::PushPromise,
stream: &mut store::Ptr, stream: &mut store::Ptr,
) -> Result<(), RecvError> { ) -> Result<(), Error> {
stream.state.reserve_remote()?; stream.state.reserve_remote()?;
if frame.is_over_size() { if frame.is_over_size() {
// A frame is over size if the decoded header block was bigger than // A frame is over size if the decoded header block was bigger than
@@ -682,10 +662,10 @@ impl Recv {
headers frame is over size; promised_id={:?};", headers frame is over size; promised_id={:?};",
frame.promised_id(), frame.promised_id(),
); );
return Err(RecvError::Stream { return Err(Error::library_reset(
id: frame.promised_id(), frame.promised_id(),
reason: Reason::REFUSED_STREAM, Reason::REFUSED_STREAM,
}); ));
} }
let promised_id = frame.promised_id(); let promised_id = frame.promised_id();
@@ -708,10 +688,7 @@ impl Recv {
promised_id, promised_id,
), ),
} }
return Err(RecvError::Stream { return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
id: promised_id,
reason: Reason::PROTOCOL_ERROR,
});
} }
use super::peer::PollMessage::*; use super::peer::PollMessage::*;
@@ -741,18 +718,16 @@ impl Recv {
/// Handle remote sending an explicit RST_STREAM. /// Handle remote sending an explicit RST_STREAM.
pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) {
// Notify the stream // Notify the stream
stream stream.state.recv_reset(frame, stream.is_pending_send);
.state
.recv_reset(frame.reason(), stream.is_pending_send);
stream.notify_send(); stream.notify_send();
stream.notify_recv(); stream.notify_recv();
} }
/// Handle a received error /// Handle a connection-level error
pub fn recv_err(&mut self, err: &proto::Error, stream: &mut Stream) { pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
// Receive an error // Receive an error
stream.state.recv_err(err); stream.state.handle_error(err);
// If a receiver is waiting, notify it // If a receiver is waiting, notify it
stream.notify_send(); stream.notify_send();
@@ -783,11 +758,11 @@ impl Recv {
self.max_stream_id self.max_stream_id
} }
pub fn next_stream_id(&self) -> Result<StreamId, RecvError> { pub fn next_stream_id(&self) -> Result<StreamId, Error> {
if let Ok(id) = self.next_stream_id { if let Ok(id) = self.next_stream_id {
Ok(id) Ok(id)
} else { } else {
Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
} }
} }
@@ -802,10 +777,10 @@ impl Recv {
} }
/// Returns true if the remote peer can reserve a stream with the given ID. /// Returns true if the remote peer can reserve a stream with the given ID.
pub fn ensure_can_reserve(&self) -> Result<(), RecvError> { pub fn ensure_can_reserve(&self) -> Result<(), Error> {
if !self.is_push_enabled { if !self.is_push_enabled {
proto_err!(conn: "recv_push_promise: push is disabled"); proto_err!(conn: "recv_push_promise: push is disabled");
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
Ok(()) Ok(())
@@ -1092,8 +1067,8 @@ impl Open {
// ===== impl RecvHeaderBlockError ===== // ===== impl RecvHeaderBlockError =====
impl<T> From<RecvError> for RecvHeaderBlockError<T> { impl<T> From<Error> for RecvHeaderBlockError<T> {
fn from(err: RecvError) -> Self { fn from(err: Error) -> Self {
RecvHeaderBlockError::State(err) RecvHeaderBlockError::State(err)
} }
} }

View File

@@ -2,8 +2,9 @@ use super::{
store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId, store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId,
StreamIdOverflow, WindowSize, StreamIdOverflow, WindowSize,
}; };
use crate::codec::{RecvError, UserError}; use crate::codec::UserError;
use crate::frame::{self, Reason}; use crate::frame::{self, Reason};
use crate::proto::{Error, Initiator};
use bytes::Buf; use bytes::Buf;
use http; use http;
@@ -161,6 +162,7 @@ impl Send {
pub fn send_reset<B>( pub fn send_reset<B>(
&mut self, &mut self,
reason: Reason, reason: Reason,
initiator: Initiator,
buffer: &mut Buffer<Frame<B>>, buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr, stream: &mut store::Ptr,
counts: &mut Counts, counts: &mut Counts,
@@ -169,14 +171,16 @@ impl Send {
let is_reset = stream.state.is_reset(); let is_reset = stream.state.is_reset();
let is_closed = stream.state.is_closed(); let is_closed = stream.state.is_closed();
let is_empty = stream.pending_send.is_empty(); let is_empty = stream.pending_send.is_empty();
let stream_id = stream.id;
tracing::trace!( tracing::trace!(
"send_reset(..., reason={:?}, stream={:?}, ..., \ "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \
is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \ is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
state={:?} \ state={:?} \
", ",
reason, reason,
stream.id, initiator,
stream_id,
is_reset, is_reset,
is_closed, is_closed,
is_empty, is_empty,
@@ -187,13 +191,13 @@ impl Send {
// Don't double reset // Don't double reset
tracing::trace!( tracing::trace!(
" -> not sending RST_STREAM ({:?} is already reset)", " -> not sending RST_STREAM ({:?} is already reset)",
stream.id stream_id
); );
return; return;
} }
// Transition the state to reset no matter what. // Transition the state to reset no matter what.
stream.state.set_reset(reason); stream.state.set_reset(stream_id, reason, initiator);
// If closed AND the send queue is flushed, then the stream cannot be // If closed AND the send queue is flushed, then the stream cannot be
// reset explicitly, either. Implicit resets can still be queued. // reset explicitly, either. Implicit resets can still be queued.
@@ -201,7 +205,7 @@ impl Send {
tracing::trace!( tracing::trace!(
" -> not sending explicit RST_STREAM ({:?} was closed \ " -> not sending explicit RST_STREAM ({:?} was closed \
and send queue was flushed)", and send queue was flushed)",
stream.id stream_id
); );
return; return;
} }
@@ -371,7 +375,14 @@ impl Send {
if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
tracing::debug!("recv_stream_window_update !!; err={:?}", e); tracing::debug!("recv_stream_window_update !!; err={:?}", e);
self.send_reset(Reason::FLOW_CONTROL_ERROR, buffer, stream, counts, task); self.send_reset(
Reason::FLOW_CONTROL_ERROR,
Initiator::Library,
buffer,
stream,
counts,
task,
);
return Err(e); return Err(e);
} }
@@ -379,7 +390,7 @@ impl Send {
Ok(()) Ok(())
} }
pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), RecvError> { pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> {
if last_stream_id > self.max_stream_id { if last_stream_id > self.max_stream_id {
// The remote endpoint sent a `GOAWAY` frame indicating a stream // The remote endpoint sent a `GOAWAY` frame indicating a stream
// that we never sent, or that we have already terminated on account // that we never sent, or that we have already terminated on account
@@ -392,14 +403,14 @@ impl Send {
"recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})", "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})",
last_stream_id, self.max_stream_id, last_stream_id, self.max_stream_id,
); );
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
self.max_stream_id = last_stream_id; self.max_stream_id = last_stream_id;
Ok(()) Ok(())
} }
pub fn recv_err<B>( pub fn handle_error<B>(
&mut self, &mut self,
buffer: &mut Buffer<Frame<B>>, buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr, stream: &mut store::Ptr,
@@ -417,7 +428,7 @@ impl Send {
store: &mut Store, store: &mut Store,
counts: &mut Counts, counts: &mut Counts,
task: &mut Option<Waker>, task: &mut Option<Waker>,
) -> Result<(), RecvError> { ) -> Result<(), Error> {
// Applies an update to the remote endpoint's initial window size. // Applies an update to the remote endpoint's initial window size.
// //
// Per RFC 7540 §6.9.2: // Per RFC 7540 §6.9.2:
@@ -480,7 +491,7 @@ impl Send {
// of a stream is reduced? Maybe it should if the capacity // of a stream is reduced? Maybe it should if the capacity
// is reduced to zero, allowing the producer to stop work. // is reduced to zero, allowing the producer to stop work.
Ok::<_, RecvError>(()) Ok::<_, Error>(())
})?; })?;
self.prioritize self.prioritize
@@ -490,7 +501,7 @@ impl Send {
store.for_each(|mut stream| { store.for_each(|mut stream| {
self.recv_stream_window_update(inc, buffer, &mut stream, counts, task) self.recv_stream_window_update(inc, buffer, &mut stream, counts, task)
.map_err(RecvError::Connection) .map_err(Error::library_go_away)
})?; })?;
} }
} }

View File

@@ -1,9 +1,8 @@
use std::io; use std::io;
use crate::codec::UserError::*; use crate::codec::UserError;
use crate::codec::{RecvError, UserError}; use crate::frame::{self, Reason, StreamId};
use crate::frame::{self, Reason}; use crate::proto::{self, Error, Initiator, PollReset};
use crate::proto::{self, PollReset};
use self::Inner::*; use self::Inner::*;
use self::Peer::*; use self::Peer::*;
@@ -53,7 +52,7 @@ pub struct State {
inner: Inner, inner: Inner,
} }
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone)]
enum Inner { enum Inner {
Idle, Idle,
// TODO: these states shouldn't count against concurrency limits: // TODO: these states shouldn't count against concurrency limits:
@@ -71,12 +70,10 @@ enum Peer {
Streaming, Streaming,
} }
#[derive(Debug, Copy, Clone)] #[derive(Debug, Clone)]
enum Cause { enum Cause {
EndStream, EndStream,
Proto(Reason), Error(Error),
LocallyReset(Reason),
Io,
/// This indicates to the connection that a reset frame must be sent out /// This indicates to the connection that a reset frame must be sent out
/// once the send queue has been flushed. /// once the send queue has been flushed.
@@ -85,7 +82,7 @@ enum Cause {
/// - User drops all references to a stream, so we want to CANCEL the it. /// - User drops all references to a stream, so we want to CANCEL the it.
/// - Header block size was too large, so we want to REFUSE, possibly /// - Header block size was too large, so we want to REFUSE, possibly
/// after sending a 431 response frame. /// after sending a 431 response frame.
Scheduled(Reason), ScheduledLibraryReset(Reason),
} }
impl State { impl State {
@@ -123,7 +120,7 @@ impl State {
} }
_ => { _ => {
// All other transitions result in a protocol error // All other transitions result in a protocol error
return Err(UnexpectedFrameType); return Err(UserError::UnexpectedFrameType);
} }
}; };
@@ -133,7 +130,7 @@ impl State {
/// Opens the receive-half of the stream when a HEADERS frame is received. /// Opens the receive-half of the stream when a HEADERS frame is received.
/// ///
/// Returns true if this transitions the state to Open. /// Returns true if this transitions the state to Open.
pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, RecvError> { pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> {
let mut initial = false; let mut initial = false;
let eos = frame.is_end_stream(); let eos = frame.is_end_stream();
@@ -195,10 +192,10 @@ impl State {
HalfClosedLocal(Streaming) HalfClosedLocal(Streaming)
} }
} }
state => { ref state => {
// All other transitions result in a protocol error // All other transitions result in a protocol error
proto_err!(conn: "recv_open: in unexpected state {:?}", state); proto_err!(conn: "recv_open: in unexpected state {:?}", state);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
}; };
@@ -206,15 +203,15 @@ impl State {
} }
/// Transition from Idle -> ReservedRemote /// Transition from Idle -> ReservedRemote
pub fn reserve_remote(&mut self) -> Result<(), RecvError> { pub fn reserve_remote(&mut self) -> Result<(), Error> {
match self.inner { match self.inner {
Idle => { Idle => {
self.inner = ReservedRemote; self.inner = ReservedRemote;
Ok(()) Ok(())
} }
state => { ref state => {
proto_err!(conn: "reserve_remote: in unexpected state {:?}", state); proto_err!(conn: "reserve_remote: in unexpected state {:?}", state);
Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
} }
} }
} }
@@ -231,7 +228,7 @@ impl State {
} }
/// Indicates that the remote side will not send more data to the local. /// Indicates that the remote side will not send more data to the local.
pub fn recv_close(&mut self) -> Result<(), RecvError> { pub fn recv_close(&mut self) -> Result<(), Error> {
match self.inner { match self.inner {
Open { local, .. } => { Open { local, .. } => {
// The remote side will continue to receive data. // The remote side will continue to receive data.
@@ -244,9 +241,9 @@ impl State {
self.inner = Closed(Cause::EndStream); self.inner = Closed(Cause::EndStream);
Ok(()) Ok(())
} }
state => { ref state => {
proto_err!(conn: "recv_close: in unexpected state {:?}", state); proto_err!(conn: "recv_close: in unexpected state {:?}", state);
Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
} }
} }
} }
@@ -254,9 +251,9 @@ impl State {
/// The remote explicitly sent a RST_STREAM. /// The remote explicitly sent a RST_STREAM.
/// ///
/// # Arguments /// # Arguments
/// - `reason`: the reason field of the received RST_STREAM frame. /// - `frame`: the received RST_STREAM frame.
/// - `queued`: true if this stream has frames in the pending send queue. /// - `queued`: true if this stream has frames in the pending send queue.
pub fn recv_reset(&mut self, reason: Reason, queued: bool) { pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) {
match self.inner { match self.inner {
// If the stream is already in a `Closed` state, do nothing, // If the stream is already in a `Closed` state, do nothing,
// provided that there are no frames still in the send queue. // provided that there are no frames still in the send queue.
@@ -275,30 +272,28 @@ impl State {
// In either of these cases, we want to overwrite the stream's // In either of these cases, we want to overwrite the stream's
// previous state with the received RST_STREAM, so that the queue // previous state with the received RST_STREAM, so that the queue
// will be cleared by `Prioritize::pop_frame`. // will be cleared by `Prioritize::pop_frame`.
state => { ref state => {
tracing::trace!( tracing::trace!(
"recv_reset; reason={:?}; state={:?}; queued={:?}", "recv_reset; frame={:?}; state={:?}; queued={:?}",
reason, frame,
state, state,
queued queued
); );
self.inner = Closed(Cause::Proto(reason)); self.inner = Closed(Cause::Error(Error::remote_reset(
frame.stream_id(),
frame.reason(),
)));
} }
} }
} }
/// We noticed a protocol error. /// Handle a connection-level error.
pub fn recv_err(&mut self, err: &proto::Error) { pub fn handle_error(&mut self, err: &proto::Error) {
use crate::proto::Error::*;
match self.inner { match self.inner {
Closed(..) => {} Closed(..) => {}
_ => { _ => {
tracing::trace!("recv_err; err={:?}", err); tracing::trace!("handle_error; err={:?}", err);
self.inner = Closed(match *err { self.inner = Closed(Cause::Error(err.clone()));
Proto(reason) => Cause::LocallyReset(reason),
Io(..) => Cause::Io,
});
} }
} }
} }
@@ -306,9 +301,9 @@ impl State {
pub fn recv_eof(&mut self) { pub fn recv_eof(&mut self) {
match self.inner { match self.inner {
Closed(..) => {} Closed(..) => {}
s => { ref state => {
tracing::trace!("recv_eof; state={:?}", s); tracing::trace!("recv_eof; state={:?}", state);
self.inner = Closed(Cause::Io); self.inner = Closed(Cause::Error(io::ErrorKind::BrokenPipe.into()));
} }
} }
} }
@@ -325,39 +320,39 @@ impl State {
tracing::trace!("send_close: HalfClosedRemote => Closed"); tracing::trace!("send_close: HalfClosedRemote => Closed");
self.inner = Closed(Cause::EndStream); self.inner = Closed(Cause::EndStream);
} }
state => panic!("send_close: unexpected state {:?}", state), ref state => panic!("send_close: unexpected state {:?}", state),
} }
} }
/// Set the stream state to reset locally. /// Set the stream state to reset locally.
pub fn set_reset(&mut self, reason: Reason) { pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) {
self.inner = Closed(Cause::LocallyReset(reason)); self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator)));
} }
/// Set the stream state to a scheduled reset. /// Set the stream state to a scheduled reset.
pub fn set_scheduled_reset(&mut self, reason: Reason) { pub fn set_scheduled_reset(&mut self, reason: Reason) {
debug_assert!(!self.is_closed()); debug_assert!(!self.is_closed());
self.inner = Closed(Cause::Scheduled(reason)); self.inner = Closed(Cause::ScheduledLibraryReset(reason));
} }
pub fn get_scheduled_reset(&self) -> Option<Reason> { pub fn get_scheduled_reset(&self) -> Option<Reason> {
match self.inner { match self.inner {
Closed(Cause::Scheduled(reason)) => Some(reason), Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason),
_ => None, _ => None,
} }
} }
pub fn is_scheduled_reset(&self) -> bool { pub fn is_scheduled_reset(&self) -> bool {
match self.inner { match self.inner {
Closed(Cause::Scheduled(..)) => true, Closed(Cause::ScheduledLibraryReset(..)) => true,
_ => false, _ => false,
} }
} }
pub fn is_local_reset(&self) -> bool { pub fn is_local_reset(&self) -> bool {
match self.inner { match self.inner {
Closed(Cause::LocallyReset(_)) => true, Closed(Cause::Error(ref e)) => e.is_local(),
Closed(Cause::Scheduled(..)) => true, Closed(Cause::ScheduledLibraryReset(..)) => true,
_ => false, _ => false,
} }
} }
@@ -436,10 +431,10 @@ impl State {
pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> { pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
// TODO: Is this correct? // TODO: Is this correct?
match self.inner { match self.inner {
Closed(Cause::Proto(reason)) Closed(Cause::Error(ref e)) => Err(e.clone()),
| Closed(Cause::LocallyReset(reason)) Closed(Cause::ScheduledLibraryReset(reason)) => {
| Closed(Cause::Scheduled(reason)) => Err(proto::Error::Proto(reason)), Err(proto::Error::library_go_away(reason))
Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())), }
Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false), Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false),
_ => Ok(true), _ => Ok(true),
} }
@@ -448,10 +443,10 @@ impl State {
/// Returns a reason if the stream has been reset. /// Returns a reason if the stream has been reset.
pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> { pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
match self.inner { match self.inner {
Closed(Cause::Proto(reason)) Closed(Cause::Error(Error::Reset(_, reason, _)))
| Closed(Cause::LocallyReset(reason)) | Closed(Cause::Error(Error::GoAway(_, reason, _)))
| Closed(Cause::Scheduled(reason)) => Ok(Some(reason)), | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)),
Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into()).into()), Closed(Cause::Error(ref e)) => Err(e.clone().into()),
Open { Open {
local: Streaming, .. local: Streaming, ..
} }

View File

@@ -1,9 +1,9 @@
use super::recv::RecvHeaderBlockError; use super::recv::RecvHeaderBlockError;
use super::store::{self, Entry, Resolve, Store}; use super::store::{self, Entry, Resolve, Store};
use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId}; use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId};
use crate::codec::{Codec, RecvError, SendError, UserError}; use crate::codec::{Codec, SendError, UserError};
use crate::frame::{self, Frame, Reason}; use crate::frame::{self, Frame, Reason};
use crate::proto::{peer, Open, Peer, WindowSize}; use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize};
use crate::{client, proto, server}; use crate::{client, proto, server};
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
@@ -180,7 +180,7 @@ where
me.poll_complete(&self.send_buffer, cx, dst) me.poll_complete(&self.send_buffer, cx, dst)
} }
pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> { pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
@@ -198,7 +198,7 @@ where
) )
} }
pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> { pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
@@ -297,30 +297,30 @@ where
} }
impl<B> DynStreams<'_, B> { impl<B> DynStreams<'_, B> {
pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), RecvError> { pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
me.recv_headers(self.peer, &self.send_buffer, frame) me.recv_headers(self.peer, &self.send_buffer, frame)
} }
pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), RecvError> { pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
me.recv_data(self.peer, &self.send_buffer, frame) me.recv_data(self.peer, &self.send_buffer, frame)
} }
pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), RecvError> { pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
me.recv_reset(&self.send_buffer, frame) me.recv_reset(&self.send_buffer, frame)
} }
/// Handle a received error and return the ID of the last processed stream. /// Notify all streams that a connection-level error happened.
pub fn recv_err(&mut self, err: &proto::Error) -> StreamId { pub fn handle_error(&mut self, err: proto::Error) -> StreamId {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
me.recv_err(&self.send_buffer, err) me.handle_error(&self.send_buffer, err)
} }
pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), RecvError> { pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
me.recv_go_away(&self.send_buffer, frame) me.recv_go_away(&self.send_buffer, frame)
} }
@@ -329,12 +329,12 @@ impl<B> DynStreams<'_, B> {
self.inner.lock().unwrap().actions.recv.last_processed_id() self.inner.lock().unwrap().actions.recv.last_processed_id()
} }
pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), RecvError> { pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
me.recv_window_update(&self.send_buffer, frame) me.recv_window_update(&self.send_buffer, frame)
} }
pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), RecvError> { pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
me.recv_push_promise(&self.send_buffer, frame) me.recv_push_promise(&self.send_buffer, frame)
} }
@@ -375,7 +375,7 @@ impl Inner {
peer: peer::Dyn, peer: peer::Dyn,
send_buffer: &SendBuffer<B>, send_buffer: &SendBuffer<B>,
frame: frame::Headers, frame: frame::Headers,
) -> Result<(), RecvError> { ) -> Result<(), Error> {
let id = frame.stream_id(); let id = frame.stream_id();
// The GOAWAY process has begun. All streams with a greater ID than // The GOAWAY process has begun. All streams with a greater ID than
@@ -405,10 +405,7 @@ impl Inner {
"recv_headers for old stream={:?}, sending STREAM_CLOSED", "recv_headers for old stream={:?}, sending STREAM_CLOSED",
id, id,
); );
return Err(RecvError::Stream { return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
id,
reason: Reason::STREAM_CLOSED,
});
} }
} }
@@ -471,10 +468,7 @@ impl Inner {
Ok(()) Ok(())
} else { } else {
Err(RecvError::Stream { Err(Error::library_reset(stream.id, Reason::REFUSED_STREAM))
id: stream.id,
reason: Reason::REFUSED_STREAM,
})
} }
}, },
Err(RecvHeaderBlockError::State(err)) => Err(err), Err(RecvHeaderBlockError::State(err)) => Err(err),
@@ -484,10 +478,7 @@ impl Inner {
// Receiving trailers that don't set EOS is a "malformed" // Receiving trailers that don't set EOS is a "malformed"
// message. Malformed messages are a stream error. // message. Malformed messages are a stream error.
proto_err!(stream: "recv_headers: trailers frame was not EOS; stream={:?}", stream.id); proto_err!(stream: "recv_headers: trailers frame was not EOS; stream={:?}", stream.id);
return Err(RecvError::Stream { return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
id: stream.id,
reason: Reason::PROTOCOL_ERROR,
});
} }
actions.recv.recv_trailers(frame, stream) actions.recv.recv_trailers(frame, stream)
@@ -502,7 +493,7 @@ impl Inner {
peer: peer::Dyn, peer: peer::Dyn,
send_buffer: &SendBuffer<B>, send_buffer: &SendBuffer<B>,
frame: frame::Data, frame: frame::Data,
) -> Result<(), RecvError> { ) -> Result<(), Error> {
let id = frame.stream_id(); let id = frame.stream_id();
let stream = match self.store.find_mut(&id) { let stream = match self.store.find_mut(&id) {
@@ -529,14 +520,11 @@ impl Inner {
let sz = sz as WindowSize; let sz = sz as WindowSize;
self.actions.recv.ignore_data(sz)?; self.actions.recv.ignore_data(sz)?;
return Err(RecvError::Stream { return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
id,
reason: Reason::STREAM_CLOSED,
});
} }
proto_err!(conn: "recv_data: stream not found; id={:?}", id); proto_err!(conn: "recv_data: stream not found; id={:?}", id);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
}; };
@@ -551,7 +539,7 @@ impl Inner {
// Any stream error after receiving a DATA frame means // Any stream error after receiving a DATA frame means
// we won't give the data to the user, and so they can't // we won't give the data to the user, and so they can't
// release the capacity. We do it automatically. // release the capacity. We do it automatically.
if let Err(RecvError::Stream { .. }) = res { if let Err(Error::Reset(..)) = res {
actions actions
.recv .recv
.release_connection_capacity(sz as WindowSize, &mut None); .release_connection_capacity(sz as WindowSize, &mut None);
@@ -564,12 +552,12 @@ impl Inner {
&mut self, &mut self,
send_buffer: &SendBuffer<B>, send_buffer: &SendBuffer<B>,
frame: frame::Reset, frame: frame::Reset,
) -> Result<(), RecvError> { ) -> Result<(), Error> {
let id = frame.stream_id(); let id = frame.stream_id();
if id.is_zero() { if id.is_zero() {
proto_err!(conn: "recv_reset: invalid stream ID 0"); proto_err!(conn: "recv_reset: invalid stream ID 0");
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
// The GOAWAY process has begun. All streams with a greater ID than // The GOAWAY process has begun. All streams with a greater ID than
@@ -589,7 +577,7 @@ impl Inner {
// TODO: Are there other error cases? // TODO: Are there other error cases?
self.actions self.actions
.ensure_not_idle(self.counts.peer(), id) .ensure_not_idle(self.counts.peer(), id)
.map_err(RecvError::Connection)?; .map_err(Error::library_go_away)?;
return Ok(()); return Ok(());
} }
@@ -602,7 +590,7 @@ impl Inner {
self.counts.transition(stream, |counts, stream| { self.counts.transition(stream, |counts, stream| {
actions.recv.recv_reset(frame, stream); actions.recv.recv_reset(frame, stream);
actions.send.recv_err(send_buffer, stream, counts); actions.send.handle_error(send_buffer, stream, counts);
assert!(stream.state.is_closed()); assert!(stream.state.is_closed());
Ok(()) Ok(())
}) })
@@ -612,7 +600,7 @@ impl Inner {
&mut self, &mut self,
send_buffer: &SendBuffer<B>, send_buffer: &SendBuffer<B>,
frame: frame::WindowUpdate, frame: frame::WindowUpdate,
) -> Result<(), RecvError> { ) -> Result<(), Error> {
let id = frame.stream_id(); let id = frame.stream_id();
let mut send_buffer = send_buffer.inner.lock().unwrap(); let mut send_buffer = send_buffer.inner.lock().unwrap();
@@ -622,7 +610,7 @@ impl Inner {
self.actions self.actions
.send .send
.recv_connection_window_update(frame, &mut self.store, &mut self.counts) .recv_connection_window_update(frame, &mut self.store, &mut self.counts)
.map_err(RecvError::Connection)?; .map_err(Error::library_go_away)?;
} else { } else {
// The remote may send window updates for streams that the local now // The remote may send window updates for streams that the local now
// considers closed. It's ok... // considers closed. It's ok...
@@ -640,14 +628,14 @@ impl Inner {
} else { } else {
self.actions self.actions
.ensure_not_idle(self.counts.peer(), id) .ensure_not_idle(self.counts.peer(), id)
.map_err(RecvError::Connection)?; .map_err(Error::library_go_away)?;
} }
} }
Ok(()) Ok(())
} }
fn recv_err<B>(&mut self, send_buffer: &SendBuffer<B>, err: &proto::Error) -> StreamId { fn handle_error<B>(&mut self, send_buffer: &SendBuffer<B>, err: proto::Error) -> StreamId {
let actions = &mut self.actions; let actions = &mut self.actions;
let counts = &mut self.counts; let counts = &mut self.counts;
let mut send_buffer = send_buffer.inner.lock().unwrap(); let mut send_buffer = send_buffer.inner.lock().unwrap();
@@ -658,14 +646,14 @@ impl Inner {
self.store self.store
.for_each(|stream| { .for_each(|stream| {
counts.transition(stream, |counts, stream| { counts.transition(stream, |counts, stream| {
actions.recv.recv_err(err, &mut *stream); actions.recv.handle_error(&err, &mut *stream);
actions.send.recv_err(send_buffer, stream, counts); actions.send.handle_error(send_buffer, stream, counts);
Ok::<_, ()>(()) Ok::<_, ()>(())
}) })
}) })
.unwrap(); .unwrap();
actions.conn_error = Some(err.shallow_clone()); actions.conn_error = Some(err);
last_processed_id last_processed_id
} }
@@ -674,7 +662,7 @@ impl Inner {
&mut self, &mut self,
send_buffer: &SendBuffer<B>, send_buffer: &SendBuffer<B>,
frame: &frame::GoAway, frame: &frame::GoAway,
) -> Result<(), RecvError> { ) -> Result<(), Error> {
let actions = &mut self.actions; let actions = &mut self.actions;
let counts = &mut self.counts; let counts = &mut self.counts;
let mut send_buffer = send_buffer.inner.lock().unwrap(); let mut send_buffer = send_buffer.inner.lock().unwrap();
@@ -684,14 +672,14 @@ impl Inner {
actions.send.recv_go_away(last_stream_id)?; actions.send.recv_go_away(last_stream_id)?;
let err = frame.reason().into(); let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason());
self.store self.store
.for_each(|stream| { .for_each(|stream| {
if stream.id > last_stream_id { if stream.id > last_stream_id {
counts.transition(stream, |counts, stream| { counts.transition(stream, |counts, stream| {
actions.recv.recv_err(&err, &mut *stream); actions.recv.handle_error(&err, &mut *stream);
actions.send.recv_err(send_buffer, stream, counts); actions.send.handle_error(send_buffer, stream, counts);
Ok::<_, ()>(()) Ok::<_, ()>(())
}) })
} else { } else {
@@ -709,7 +697,7 @@ impl Inner {
&mut self, &mut self,
send_buffer: &SendBuffer<B>, send_buffer: &SendBuffer<B>,
frame: frame::PushPromise, frame: frame::PushPromise,
) -> Result<(), RecvError> { ) -> Result<(), Error> {
let id = frame.stream_id(); let id = frame.stream_id();
let promised_id = frame.promised_id(); let promised_id = frame.promised_id();
@@ -733,7 +721,7 @@ impl Inner {
} }
None => { None => {
proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state"); proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state");
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
} }
}; };
@@ -826,7 +814,7 @@ impl Inner {
// This handles resetting send state associated with the // This handles resetting send state associated with the
// stream // stream
actions.send.recv_err(send_buffer, stream, counts); actions.send.handle_error(send_buffer, stream, counts);
Ok::<_, ()>(()) Ok::<_, ()>(())
}) })
}) })
@@ -886,8 +874,13 @@ impl Inner {
let stream = self.store.resolve(key); let stream = self.store.resolve(key);
let mut send_buffer = send_buffer.inner.lock().unwrap(); let mut send_buffer = send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer; let send_buffer = &mut *send_buffer;
self.actions self.actions.send_reset(
.send_reset(stream, reason, &mut self.counts, send_buffer); stream,
reason,
Initiator::Library,
&mut self.counts,
send_buffer,
);
} }
} }
@@ -1060,7 +1053,7 @@ impl<B> StreamRef<B> {
let send_buffer = &mut *send_buffer; let send_buffer = &mut *send_buffer;
me.actions me.actions
.send_reset(stream, reason, &mut me.counts, send_buffer); .send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer);
} }
pub fn send_response( pub fn send_response(
@@ -1468,12 +1461,19 @@ impl Actions {
&mut self, &mut self,
stream: store::Ptr, stream: store::Ptr,
reason: Reason, reason: Reason,
initiator: Initiator,
counts: &mut Counts, counts: &mut Counts,
send_buffer: &mut Buffer<Frame<B>>, send_buffer: &mut Buffer<Frame<B>>,
) { ) {
counts.transition(stream, |counts, stream| { counts.transition(stream, |counts, stream| {
self.send self.send.send_reset(
.send_reset(reason, send_buffer, stream, counts, &mut self.task); reason,
initiator,
send_buffer,
stream,
counts,
&mut self.task,
);
self.recv.enqueue_reset_expiration(stream, counts); self.recv.enqueue_reset_expiration(stream, counts);
// if a RecvStream is parked, ensure it's notified // if a RecvStream is parked, ensure it's notified
stream.notify_recv(); stream.notify_recv();
@@ -1485,12 +1485,13 @@ impl Actions {
buffer: &mut Buffer<Frame<B>>, buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr, stream: &mut store::Ptr,
counts: &mut Counts, counts: &mut Counts,
res: Result<(), RecvError>, res: Result<(), Error>,
) -> Result<(), RecvError> { ) -> Result<(), Error> {
if let Err(RecvError::Stream { reason, .. }) = res { if let Err(Error::Reset(stream_id, reason, initiator)) = res {
debug_assert_eq!(stream_id, stream.id);
// Reset the stream. // Reset the stream.
self.send self.send
.send_reset(reason, buffer, stream, counts, &mut self.task); .send_reset(reason, initiator, buffer, stream, counts, &mut self.task);
Ok(()) Ok(())
} else { } else {
res res
@@ -1507,7 +1508,7 @@ impl Actions {
fn ensure_no_conn_error(&self) -> Result<(), proto::Error> { fn ensure_no_conn_error(&self) -> Result<(), proto::Error> {
if let Some(ref err) = self.conn_error { if let Some(ref err) = self.conn_error {
Err(err.shallow_clone()) Err(err.clone())
} else { } else {
Ok(()) Ok(())
} }

View File

@@ -115,9 +115,9 @@
//! [`SendStream`]: ../struct.SendStream.html //! [`SendStream`]: ../struct.SendStream.html
//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
use crate::codec::{Codec, RecvError, UserError}; use crate::codec::{Codec, UserError};
use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId}; use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
use crate::proto::{self, Config, Prioritized}; use crate::proto::{self, Config, Error, Prioritized};
use crate::{FlowControl, PingPong, RecvStream, SendStream}; use crate::{FlowControl, PingPong, RecvStream, SendStream};
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
@@ -1202,7 +1202,7 @@ where
if &PREFACE[self.pos..self.pos + n] != buf.filled() { if &PREFACE[self.pos..self.pos + n] != buf.filled() {
proto_err!(conn: "read_preface: invalid preface"); proto_err!(conn: "read_preface: invalid preface");
// TODO: Should this just write the GO_AWAY frame directly? // TODO: Should this just write the GO_AWAY frame directly?
return Poll::Ready(Err(Reason::PROTOCOL_ERROR.into())); return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
} }
self.pos += n; self.pos += n;
@@ -1388,7 +1388,7 @@ impl proto::Peer for Peer {
pseudo: Pseudo, pseudo: Pseudo,
fields: HeaderMap, fields: HeaderMap,
stream_id: StreamId, stream_id: StreamId,
) -> Result<Self::Poll, RecvError> { ) -> Result<Self::Poll, Error> {
use http::{uri, Version}; use http::{uri, Version};
let mut b = Request::builder(); let mut b = Request::builder();
@@ -1396,10 +1396,7 @@ impl proto::Peer for Peer {
macro_rules! malformed { macro_rules! malformed {
($($arg:tt)*) => {{ ($($arg:tt)*) => {{
tracing::debug!($($arg)*); tracing::debug!($($arg)*);
return Err(RecvError::Stream { return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
id: stream_id,
reason: Reason::PROTOCOL_ERROR,
});
}} }}
} }
@@ -1416,7 +1413,7 @@ impl proto::Peer for Peer {
// Specifying :status for a request is a protocol error // Specifying :status for a request is a protocol error
if pseudo.status.is_some() { if pseudo.status.is_some() {
tracing::trace!("malformed headers: :status field on request; PROTOCOL_ERROR"); tracing::trace!("malformed headers: :status field on request; PROTOCOL_ERROR");
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
} }
// Convert the URI // Convert the URI
@@ -1483,10 +1480,7 @@ impl proto::Peer for Peer {
// TODO: Should there be more specialized handling for different // TODO: Should there be more specialized handling for different
// kinds of errors // kinds of errors
proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id); proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
return Err(RecvError::Stream { return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
id: stream_id,
reason: Reason::PROTOCOL_ERROR,
});
} }
}; };

View File

@@ -1,7 +1,8 @@
use crate::SendFrame; use crate::SendFrame;
use h2::frame::{self, Frame}; use h2::frame::{self, Frame};
use h2::{self, RecvError, SendError}; use h2::proto::Error;
use h2::{self, SendError};
use futures::future::poll_fn; use futures::future::poll_fn;
use futures::{ready, Stream, StreamExt}; use futures::{ready, Stream, StreamExt};
@@ -284,7 +285,7 @@ impl Handle {
} }
impl Stream for Handle { impl Stream for Handle {
type Item = Result<Frame, RecvError>; type Item = Result<Frame, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.codec).poll_next(cx) Pin::new(&mut self.codec).poll_next(cx)

View File

@@ -410,7 +410,11 @@ async fn send_reset_notifies_recv_stream() {
}; };
let rx = async { let rx = async {
let mut body = res.into_body(); let mut body = res.into_body();
body.next().await.unwrap().expect_err("RecvBody"); let err = body.next().await.unwrap().expect_err("RecvBody");
assert_eq!(
err.to_string(),
"stream error sent by user: refused stream before processing any application logic"
);
}; };
// a FuturesUnordered is used on purpose! // a FuturesUnordered is used on purpose!
@@ -672,7 +676,7 @@ async fn sending_request_on_closed_connection() {
}; };
let poll_err = poll_fn(|cx| client.poll_ready(cx)).await.unwrap_err(); let poll_err = poll_fn(|cx| client.poll_ready(cx)).await.unwrap_err();
let msg = "protocol error: unspecific protocol error detected"; let msg = "connection error detected: unspecific protocol error detected";
assert_eq!(poll_err.to_string(), msg); assert_eq!(poll_err.to_string(), msg);
let request = Request::builder() let request = Request::builder()

View File

@@ -236,7 +236,7 @@ async fn read_goaway_with_debug_data() {
let data = poll_frame!(GoAway, codec); let data = poll_frame!(GoAway, codec);
assert_eq!(data.reason(), Reason::ENHANCE_YOUR_CALM); assert_eq!(data.reason(), Reason::ENHANCE_YOUR_CALM);
assert_eq!(data.last_stream_id(), 1); assert_eq!(data.last_stream_id(), 1);
assert_eq!(data.debug_data(), b"too_many_pings"); assert_eq!(&**data.debug_data(), b"too_many_pings");
assert_closed!(codec); assert_closed!(codec);
} }

View File

@@ -217,7 +217,7 @@ async fn recv_data_overflows_connection_window() {
let err = res.unwrap_err(); let err = res.unwrap_err();
assert_eq!( assert_eq!(
err.to_string(), err.to_string(),
"protocol error: flow-control protocol violated" "connection error detected: flow-control protocol violated"
); );
}; };
@@ -227,7 +227,7 @@ async fn recv_data_overflows_connection_window() {
let err = res.unwrap_err(); let err = res.unwrap_err();
assert_eq!( assert_eq!(
err.to_string(), err.to_string(),
"protocol error: flow-control protocol violated" "connection error detected: flow-control protocol violated"
); );
}; };
join(conn, req).await; join(conn, req).await;
@@ -278,7 +278,7 @@ async fn recv_data_overflows_stream_window() {
let err = res.unwrap_err(); let err = res.unwrap_err();
assert_eq!( assert_eq!(
err.to_string(), err.to_string(),
"protocol error: flow-control protocol violated" "stream error detected: flow-control protocol violated"
); );
}; };
@@ -358,7 +358,7 @@ async fn stream_error_release_connection_capacity() {
.expect_err("body"); .expect_err("body");
assert_eq!( assert_eq!(
err.to_string(), err.to_string(),
"protocol error: unspecific protocol error detected" "stream error detected: unspecific protocol error detected"
); );
cap.release_capacity(to_release).expect("release_capacity"); cap.release_capacity(to_release).expect("release_capacity");
}; };

View File

@@ -164,7 +164,7 @@ async fn recv_push_when_push_disabled_is_conn_error() {
let err = res.unwrap_err(); let err = res.unwrap_err();
assert_eq!( assert_eq!(
err.to_string(), err.to_string(),
"protocol error: unspecific protocol error detected" "connection error detected: unspecific protocol error detected"
); );
}; };
@@ -174,7 +174,7 @@ async fn recv_push_when_push_disabled_is_conn_error() {
let err = res.unwrap_err(); let err = res.unwrap_err();
assert_eq!( assert_eq!(
err.to_string(), err.to_string(),
"protocol error: unspecific protocol error detected" "connection error detected: unspecific protocol error detected"
); );
}; };
@@ -380,8 +380,16 @@ async fn recv_push_promise_skipped_stream_id() {
.unwrap(); .unwrap();
let req = async move { let req = async move {
let res = client.send_request(request, true).unwrap().0.await; let err = client
assert!(res.is_err()); .send_request(request, true)
.unwrap()
.0
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"connection error detected: unspecific protocol error detected"
);
}; };
// client should see a protocol error // client should see a protocol error
@@ -390,7 +398,7 @@ async fn recv_push_promise_skipped_stream_id() {
let err = res.unwrap_err(); let err = res.unwrap_err();
assert_eq!( assert_eq!(
err.to_string(), err.to_string(),
"protocol error: unspecific protocol error detected" "connection error detected: unspecific protocol error detected"
); );
}; };
@@ -435,7 +443,11 @@ async fn recv_push_promise_dup_stream_id() {
let req = async move { let req = async move {
let res = client.send_request(request, true).unwrap().0.await; let res = client.send_request(request, true).unwrap().0.await;
assert!(res.is_err()); let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"connection error detected: unspecific protocol error detected"
);
}; };
// client should see a protocol error // client should see a protocol error
@@ -444,7 +456,7 @@ async fn recv_push_promise_dup_stream_id() {
let err = res.unwrap_err(); let err = res.unwrap_err();
assert_eq!( assert_eq!(
err.to_string(), err.to_string(),
"protocol error: unspecific protocol error detected" "connection error detected: unspecific protocol error detected"
); );
}; };

View File

@@ -207,13 +207,19 @@ async fn errors_if_recv_frame_exceeds_max_frame_size() {
let body = resp.into_parts().1; let body = resp.into_parts().1;
let res = util::concat(body).await; let res = util::concat(body).await;
let err = res.unwrap_err(); let err = res.unwrap_err();
assert_eq!(err.to_string(), "protocol error: frame with invalid size"); assert_eq!(
err.to_string(),
"connection error detected: frame with invalid size"
);
}; };
// client should see a conn error // client should see a conn error
let conn = async move { let conn = async move {
let err = h2.await.unwrap_err(); let err = h2.await.unwrap_err();
assert_eq!(err.to_string(), "protocol error: frame with invalid size"); assert_eq!(
err.to_string(),
"connection error detected: frame with invalid size"
);
}; };
join(conn, req).await; join(conn, req).await;
}; };
@@ -321,7 +327,10 @@ async fn recv_goaway_finishes_processed_streams() {
// this request will trigger a goaway // this request will trigger a goaway
let req2 = async move { let req2 = async move {
let err = client.get("https://example.com/").await.unwrap_err(); let err = client.get("https://example.com/").await.unwrap_err();
assert_eq!(err.to_string(), "protocol error: not a result of an error"); assert_eq!(
err.to_string(),
"connection error received: not a result of an error"
);
}; };
join3(async move { h2.await.expect("client") }, req1, req2).await; join3(async move { h2.await.expect("client") }, req1, req2).await;