Add Graceful Shutdown support

If graceful shutdown is initiated, a GOAWAY of the max stream ID - 1 is
sent, followed by a PING frame, to measure RTT. When the PING is ACKed,
the connection sends a new GOAWAY with the proper last processed stream
ID. From there, once all active streams have completely, the connection
will finally close.
This commit is contained in:
Sean McArthur
2018-03-29 11:54:45 -07:00
parent 01d81b46c2
commit 1c5d4ded50
11 changed files with 483 additions and 92 deletions

View File

@@ -40,6 +40,7 @@ pub enum UserError {
/// The released capacity is larger than claimed capacity. /// The released capacity is larger than claimed capacity.
ReleaseCapacityTooBig, ReleaseCapacityTooBig,
/// The stream ID space is overflowed. /// The stream ID space is overflowed.
/// ///
/// A new connection is needed. /// A new connection is needed.

View File

@@ -11,8 +11,18 @@ pub struct Ping {
payload: Payload, payload: Payload,
} }
// This was just 8 randomly generated bytes. We use something besides just
// zeroes to distinguish this specific PING from any other.
const SHUTDOWN_PAYLOAD: Payload = [0x0b, 0x7b, 0xa2, 0xf0, 0x8b, 0x9b, 0xfe, 0x54];
impl Ping { impl Ping {
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
pub const SHUTDOWN: Payload = SHUTDOWN_PAYLOAD;
#[cfg(not(feature = "unstable"))]
pub(crate) const SHUTDOWN: Payload = SHUTDOWN_PAYLOAD;
pub fn new(payload: Payload) -> Ping { pub fn new(payload: Payload) -> Ping {
Ping { Ping {
ack: false, ack: false,
@@ -31,7 +41,6 @@ impl Ping {
self.ack self.ack
} }
#[cfg(feature = "unstable")]
pub fn payload(&self) -> &Payload { pub fn payload(&self) -> &Payload {
&self.payload &self.payload
} }

View File

@@ -14,6 +14,8 @@ impl StreamId {
pub const MAX: StreamId = StreamId(u32::MAX >> 1); pub const MAX: StreamId = StreamId(u32::MAX >> 1);
pub const MAX_CLIENT: StreamId = StreamId((u32::MAX >> 1) - 1);
/// Parse the stream ID /// Parse the stream ID
#[inline] #[inline]
pub fn parse(buf: &[u8]) -> (StreamId, bool) { pub fn parse(buf: &[u8]) -> (StreamId, bool) {

View File

@@ -10,6 +10,7 @@ use futures::Stream;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::io;
use std::time::Duration; use std::time::Duration;
/// An H2 connection /// An H2 connection
@@ -30,6 +31,9 @@ where
/// Read / write frame values /// Read / write frame values
codec: Codec<T, Prioritized<B::Buf>>, codec: Codec<T, Prioritized<B::Buf>>,
/// Pending GOAWAY frames to write.
go_away: GoAway,
/// Ping/pong handler /// Ping/pong handler
ping_pong: PingPong, ping_pong: PingPong,
@@ -57,11 +61,8 @@ enum State {
/// Currently open in a sane state /// Currently open in a sane state
Open, Open,
/// Waiting to send a GOAWAY frame
GoAway(frame::GoAway),
/// The codec must be flushed /// The codec must be flushed
Flush(Reason), Closing(Reason),
/// In a closed state /// In a closed state
Closed(Reason), Closed(Reason),
@@ -95,6 +96,7 @@ where
state: State::Open, state: State::Open,
error: None, error: None,
codec: codec, codec: codec,
go_away: GoAway::new(),
ping_pong: PingPong::new(), ping_pong: PingPong::new(),
settings: Settings::new(), settings: Settings::new(),
streams: streams, streams: streams,
@@ -111,9 +113,9 @@ where
/// Returns `RecvError` as this may raise errors that are caused by delayed /// Returns `RecvError` as this may raise errors that are caused by delayed
/// processing of received frames. /// processing of received frames.
fn poll_ready(&mut self) -> Poll<(), RecvError> { fn poll_ready(&mut self) -> Poll<(), RecvError> {
// The order of these calls don't really matter too much as only one // The order of these calls don't really matter too much
// should have pending work.
try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); try_ready!(self.ping_pong.send_pending_pong(&mut self.codec));
try_ready!(self.ping_pong.send_pending_ping(&mut self.codec));
try_ready!( try_ready!(
self.settings self.settings
.send_pending_ack(&mut self.codec, &mut self.streams) .send_pending_ack(&mut self.codec, &mut self.streams)
@@ -123,9 +125,47 @@ where
Ok(().into()) Ok(().into())
} }
fn transition_to_go_away(&mut self, id: StreamId, e: Reason) { /// Send any pending GOAWAY frames.
let goaway = frame::GoAway::new(id, e); ///
self.state = State::GoAway(goaway); /// This will return `Some(reason)` if the connection should be closed
/// afterwards. If this is a graceful shutdown, this returns `None`.
fn poll_go_away(&mut self) -> Poll<Option<Reason>, io::Error> {
self.go_away.send_pending_go_away(&mut self.codec)
}
fn go_away(&mut self, id: StreamId, e: Reason) {
let frame = frame::GoAway::new(id, e);
self.streams.send_go_away(id);
self.go_away.go_away(frame);
}
pub fn go_away_now(&mut self, e: Reason) {
let last_processed_id = self.streams.last_processed_id();
let frame = frame::GoAway::new(last_processed_id, e);
self.go_away.go_away_now(frame);
}
fn take_error(&mut self, ours: Reason) -> Poll<(), proto::Error> {
let reason = if let Some(theirs) = self.error.take() {
match (ours, theirs) {
// If either side reported an error, return that
// to the user.
(Reason::NO_ERROR, err) | (err, Reason::NO_ERROR) => err,
// If both sides reported an error, give their
// error back to th user. We assume our error
// was a consequence of their error, and less
// important.
(_, theirs) => theirs,
}
} else {
ours
};
if reason == Reason::NO_ERROR {
Ok(().into())
} else {
Err(proto::Error::Proto(reason))
}
} }
/// Closes the connection by transitioning to a GOAWAY state /// Closes the connection by transitioning to a GOAWAY state
@@ -134,15 +174,10 @@ where
// If we poll() and realize that there are no streams or references // If we poll() and realize that there are no streams or references
// then we can close the connection by transitioning to GOAWAY // then we can close the connection by transitioning to GOAWAY
if self.streams.num_active_streams() == 0 && !self.streams.has_streams_or_other_references() { if self.streams.num_active_streams() == 0 && !self.streams.has_streams_or_other_references() {
self.close_connection(); self.go_away_now(Reason::NO_ERROR);
} }
} }
/// Closes the connection by transitioning to a GOAWAY state
pub fn close_connection(&mut self) {
let last_processed_id = self.streams.last_processed_id();
self.transition_to_go_away(last_processed_id, Reason::NO_ERROR);
}
/// Advances the internal state of the connection. /// Advances the internal state of the connection.
pub fn poll(&mut self) -> Poll<(), proto::Error> { pub fn poll(&mut self) -> Poll<(), proto::Error> {
@@ -155,7 +190,7 @@ where
State::Open => { State::Open => {
match self.poll2() { match self.poll2() {
// The connection has shutdown normally // The connection has shutdown normally
Ok(Async::Ready(())) => return Ok(().into()), Ok(Async::Ready(())) => return self.take_error(Reason::NO_ERROR),
// The connection is not ready to make progress // The connection is not ready to make progress
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
// Ensure all window updates have been sent. // Ensure all window updates have been sent.
@@ -163,10 +198,9 @@ where
// This will also handle flushing `self.codec` // This will also handle flushing `self.codec`
try_ready!(self.streams.poll_complete(&mut self.codec)); try_ready!(self.streams.poll_complete(&mut self.codec));
if self.error.is_some() { if self.error.is_some() || self.go_away.should_close_on_idle() {
if self.streams.num_active_streams() == 0 { if self.streams.num_active_streams() == 0 {
let last_processed_id = self.streams.last_processed_id(); self.go_away_now(Reason::NO_ERROR);
self.transition_to_go_away(last_processed_id, Reason::NO_ERROR);
continue; continue;
} }
} }
@@ -179,9 +213,19 @@ where
Err(Connection(e)) => { Err(Connection(e)) => {
debug!("Connection::poll; err={:?}", e); debug!("Connection::poll; err={:?}", e);
// We may have already sent a GOAWAY for this error,
// if so, don't send another, just flush and close up.
if let Some(reason) = self.go_away.going_away_reason() {
if reason == e {
trace!(" -> already going away");
self.state = State::Closing(e);
continue;
}
}
// Reset all active streams // Reset all active streams
let last_processed_id = self.streams.recv_err(&e.into()); self.streams.recv_err(&e.into());
self.transition_to_go_away(last_processed_id, e); self.go_away_now(e);
}, },
// Attempting to read a frame resulted in a stream level error. // Attempting to read a frame resulted in a stream level error.
// This is handled by resetting the frame then trying to read // This is handled by resetting the frame then trying to read
@@ -207,48 +251,16 @@ where
return Err(e); return Err(e);
}, },
} }
}, }
State::GoAway(frame) => { State::Closing(reason) => {
// Ensure the codec is ready to accept the frame trace!("connection closing after flush, reason={:?}", reason);
try_ready!(self.codec.poll_ready());
// Buffer the GOAWAY frame
self.codec
.buffer(frame.into())
.ok()
.expect("invalid GO_AWAY frame");
// GOAWAY sent, transition the connection to a closed state
// Determine what error code should be returned to user.
let reason = if let Some(theirs) = self.error.take() {
let ours = frame.reason();
match (ours, theirs) {
// If either side reported an error, return that
// to the user.
(Reason::NO_ERROR, err) | (err, Reason::NO_ERROR) => err,
// If both sides reported an error, give their
// error back to th user. We assume our error
// was a consequence of their error, and less
// important.
(_, theirs) => theirs,
}
} else {
frame.reason()
};
self.state = State::Flush(reason);
},
State::Flush(reason) => {
// Flush the codec // Flush the codec
try_ready!(self.codec.flush()); try_ready!(self.codec.flush());
// Transition the state to error // Transition the state to error
self.state = State::Closed(reason); self.state = State::Closed(reason);
}, },
State::Closed(reason) => if let Reason::NO_ERROR = reason { State::Closed(reason) => return self.take_error(reason),
return Ok(Async::Ready(()));
} else {
return Err(reason.into());
},
} }
} }
} }
@@ -263,6 +275,17 @@ where
loop { loop {
// First, ensure that the `Connection` is able to receive a frame // First, ensure that the `Connection` is able to receive a frame
//
// The order here matters:
// - poll_go_away may buffer a graceful shutdown GOAWAY frame
// - If it has, we've also added a PING to be sent in poll_ready
if let Some(reason) = try_ready!(self.poll_go_away()) {
if self.go_away.should_close_now() {
return Err(RecvError::Connection(reason));
}
// Only NO_ERROR should be waiting for idle
debug_assert_eq!(reason, Reason::NO_ERROR, "graceful GOAWAY should be NO_ERROR");
}
try_ready!(self.poll_ready()); try_ready!(self.poll_ready());
match try_ready!(self.codec.poll()) { match try_ready!(self.codec.poll()) {
@@ -292,12 +315,21 @@ where
// but should allow continuing to process current streams // but should allow continuing to process current streams
// until they are all EOS. Once they are, State should // until they are all EOS. Once they are, State should
// transition to GoAway. // transition to GoAway.
self.streams.recv_goaway(&frame); self.streams.recv_go_away(&frame);
self.error = Some(frame.reason()); self.error = Some(frame.reason());
}, },
Some(Ping(frame)) => { Some(Ping(frame)) => {
trace!("recv PING; frame={:?}", frame); trace!("recv PING; frame={:?}", frame);
self.ping_pong.recv_ping(frame); let status = self.ping_pong.recv_ping(frame);
if status.is_shutdown() {
assert!(
self.go_away.is_going_away(),
"received unexpected shutdown ping"
);
let last_processed_id = self.streams.last_processed_id();
self.go_away(last_processed_id, Reason::NO_ERROR);
}
}, },
Some(WindowUpdate(frame)) => { Some(WindowUpdate(frame)) => {
trace!("recv WINDOW_UPDATE; frame={:?}", frame); trace!("recv WINDOW_UPDATE; frame={:?}", frame);
@@ -339,4 +371,29 @@ where
pub fn next_incoming(&mut self) -> Option<StreamRef<B::Buf>> { pub fn next_incoming(&mut self) -> Option<StreamRef<B::Buf>> {
self.streams.next_incoming() self.streams.next_incoming()
} }
// Graceful shutdown only makes sense for server peers.
pub fn go_away_gracefully(&mut self) {
if self.go_away.is_going_away() {
// No reason to start a new one.
return;
}
// According to http://httpwg.org/specs/rfc7540.html#GOAWAY:
//
// > A server that is attempting to gracefully shut down a connection
// > SHOULD send an initial GOAWAY frame with the last stream
// > identifier set to 231-1 and a NO_ERROR code. This signals to the
// > client that a shutdown is imminent and that initiating further
// > requests is prohibited. After allowing time for any in-flight
// > stream creation (at least one round-trip time), the server can
// > send another GOAWAY frame with an updated last stream identifier.
// > This ensures that a connection can be cleanly shut down without
// > losing requests.
self.go_away(StreamId::MAX_CLIENT, Reason::NO_ERROR);
// We take the advice of waiting 1 RTT literally, and wait
// for a pong before proceeding.
self.ping_pong.ping_shutdown();
}
} }

134
src/proto/go_away.rs Normal file
View File

@@ -0,0 +1,134 @@
use codec::Codec;
use frame::{self, Reason, StreamId};
use bytes::Buf;
use futures::{Async, Poll};
use std::io;
use tokio_io::AsyncWrite;
/// Manages our sending of GOAWAY frames.
#[derive(Debug)]
pub(super) struct GoAway {
/// Whether the connection should close now, or wait until idle.
close_now: bool,
/// Records if we've sent any GOAWAY before.
going_away: Option<GoingAway>,
/// A GOAWAY frame that must be buffered in the Codec immediately.
pending: Option<frame::GoAway>,
}
/// Keeps a memory of any GOAWAY frames we've sent before.
///
/// This looks very similar to a `frame::GoAway`, but is a separate type. Why?
/// Mostly for documentation purposes. This type is to record status. If it
/// were a `frame::GoAway`, it might appear like we eventually wanted to
/// serialize it. We **only** want to be able to look up these fields at a
/// later time.
///
/// (Technically, `frame::GoAway` should gain an opaque_debug_data field as
/// well, and we wouldn't want to save that here to accidentally dump in logs,
/// or waste struct space.)
#[derive(Debug)]
struct GoingAway {
/// Stores the highest stream ID of a GOAWAY that has been sent.
///
/// It's illegal to send a subsequent GOAWAY with a higher ID.
last_processed_id: StreamId,
/// Records the error code of any GOAWAY frame sent.
reason: Reason,
}
impl GoAway {
pub fn new() -> Self {
GoAway {
close_now: false,
going_away: None,
pending: None,
}
}
/// Enqueue a GOAWAY frame to be written.
///
/// The connection is expected to continue to run until idle.
pub fn go_away(&mut self, f: frame::GoAway) {
if let Some(ref going_away) = self.going_away {
assert!(
f.last_stream_id() <= going_away.last_processed_id,
"GOAWAY stream IDs shouldn't be higher; \
last_processed_id = {:?}, f.last_stream_id() = {:?}",
going_away.last_processed_id,
f.last_stream_id(),
);
}
self.going_away = Some(GoingAway {
last_processed_id: f.last_stream_id(),
reason: f.reason(),
});
self.pending = Some(f);
}
pub fn go_away_now(&mut self, f: frame::GoAway) {
self.close_now = true;
if let Some(ref going_away) = self.going_away {
// Prevent sending the same GOAWAY twice.
if going_away.last_processed_id == f.last_stream_id()
&& going_away.reason == f.reason() {
return;
}
}
self.go_away(f);
}
/// Return if a GOAWAY has ever been scheduled.
pub fn is_going_away(&self) -> bool {
self.going_away.is_some()
}
/// Return the last Reason we've sent.
pub fn going_away_reason(&self) -> Option<Reason> {
self.going_away
.as_ref()
.map(|g| g.reason)
}
/// Returns if the connection should close now, or wait until idle.
pub fn should_close_now(&self) -> bool {
self.pending.is_none() && self.close_now
}
/// Returns if the connection should be closed when idle.
pub fn should_close_on_idle(&self) -> bool {
!self.close_now && self.going_away
.as_ref()
.map(|g| g.last_processed_id != StreamId::MAX_CLIENT)
.unwrap_or(false)
}
/// Try to write a pending GOAWAY frame to the buffer.
///
/// If a frame is written, the `Reason` of the GOAWAY is returned.
pub fn send_pending_go_away<T, B>(&mut self, dst: &mut Codec<T, B>) -> Poll<Option<Reason>, io::Error>
where
T: AsyncWrite,
B: Buf,
{
if let Some(frame) = self.pending.take() {
if !dst.poll_ready()?.is_ready() {
self.pending = Some(frame);
return Ok(Async::NotReady);
}
let reason = frame.reason();
dst.buffer(frame.into())
.ok()
.expect("invalid GOAWAY frame");
return Ok(Async::Ready(Some(reason)));
}
Ok(Async::Ready(None))
}
}

View File

@@ -1,5 +1,6 @@
mod connection; mod connection;
mod error; mod error;
mod go_away;
mod peer; mod peer;
mod ping_pong; mod ping_pong;
mod settings; mod settings;
@@ -13,6 +14,7 @@ pub(crate) use self::streams::Prioritized;
use codec::Codec; use codec::Codec;
use self::go_away::GoAway;
use self::ping_pong::PingPong; use self::ping_pong::PingPong;
use self::settings::Settings; use self::settings::Settings;

View File

@@ -10,25 +10,67 @@ use tokio_io::AsyncWrite;
/// Acknowledges ping requests from the remote. /// Acknowledges ping requests from the remote.
#[derive(Debug)] #[derive(Debug)]
pub struct PingPong { pub struct PingPong {
sending_pong: Option<PingPayload>, pending_ping: Option<PendingPing>,
pending_pong: Option<PingPayload>,
}
#[derive(Debug)]
struct PendingPing {
payload: PingPayload,
sent: bool,
}
/// Status returned from `PingPong::recv_ping`.
#[derive(Debug)]
pub(crate) enum ReceivedPing {
MustAck,
Unknown,
Shutdown,
} }
impl PingPong { impl PingPong {
pub fn new() -> Self { pub fn new() -> Self {
PingPong { PingPong {
sending_pong: None, pending_ping: None,
pending_pong: None,
} }
} }
pub fn ping_shutdown(&mut self) {
assert!(self.pending_ping.is_none());
self.pending_ping = Some(PendingPing {
payload: Ping::SHUTDOWN,
sent: false,
});
}
/// Process a ping /// Process a ping
pub fn recv_ping(&mut self, ping: Ping) { pub(crate) fn recv_ping(&mut self, ping: Ping) -> ReceivedPing {
// The caller should always check that `send_pongs` returns ready before // The caller should always check that `send_pongs` returns ready before
// calling `recv_ping`. // calling `recv_ping`.
assert!(self.sending_pong.is_none()); assert!(self.pending_pong.is_none());
if !ping.is_ack() { if ping.is_ack() {
if let Some(pending) = self.pending_ping.take() {
if &pending.payload == ping.payload() {
trace!("recv PING ack");
return ReceivedPing::Shutdown;
}
// if not the payload we expected, put it back.
self.pending_ping = Some(pending);
}
// else we were acked a ping we didn't send?
// The spec doesn't require us to do anything about this,
// so for resiliency, just ignore it for now.
warn!("recv PING ack that we never sent: {:?}", ping);
ReceivedPing::Unknown
} else {
// Save the ping's payload to be sent as an acknowledgement. // Save the ping's payload to be sent as an acknowledgement.
self.sending_pong = Some(ping.into_payload()); self.pending_pong = Some(ping.into_payload());
ReceivedPing::MustAck
} }
} }
@@ -38,15 +80,46 @@ impl PingPong {
T: AsyncWrite, T: AsyncWrite,
B: Buf, B: Buf,
{ {
if let Some(pong) = self.sending_pong.take() { if let Some(pong) = self.pending_pong.take() {
if !dst.poll_ready()?.is_ready() { if !dst.poll_ready()?.is_ready() {
self.sending_pong = Some(pong); self.pending_pong = Some(pong);
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
dst.buffer(Ping::pong(pong).into()).ok().expect("invalid pong frame"); dst.buffer(Ping::pong(pong).into())
.expect("invalid pong frame");
}
Ok(Async::Ready(()))
}
/// Send any pending pings.
pub fn send_pending_ping<T, B>(&mut self, dst: &mut Codec<T, B>) -> Poll<(), io::Error>
where
T: AsyncWrite,
B: Buf,
{
if let Some(ref mut ping) = self.pending_ping {
if !ping.sent {
if !dst.poll_ready()?.is_ready() {
return Ok(Async::NotReady);
}
dst.buffer(Ping::new(ping.payload).into())
.expect("invalid ping frame");
ping.sent = true;
}
} }
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
} }
impl ReceivedPing {
pub fn is_shutdown(&self) -> bool {
match *self {
ReceivedPing::Shutdown => true,
_ => false,
}
}
}

View File

@@ -26,6 +26,15 @@ pub(super) struct Recv {
/// The stream ID of the last processed stream /// The stream ID of the last processed stream
last_processed_id: StreamId, last_processed_id: StreamId,
/// Any streams with a higher ID are ignored.
///
/// This starts as MAX, but is lowered when a GOAWAY is received.
///
/// > After sending a GOAWAY frame, the sender can discard frames for
/// > streams initiated by the receiver with identifiers higher than
/// > the identified last stream.
max_stream_id: StreamId,
/// Streams that have pending window updates /// Streams that have pending window updates
pending_window_updates: store::Queue<stream::NextWindowUpdate>, pending_window_updates: store::Queue<stream::NextWindowUpdate>,
@@ -85,7 +94,8 @@ impl Recv {
in_flight_data: 0 as WindowSize, in_flight_data: 0 as WindowSize,
next_stream_id: Ok(next_stream_id.into()), next_stream_id: Ok(next_stream_id.into()),
pending_window_updates: store::Queue::new(), pending_window_updates: store::Queue::new(),
last_processed_id: StreamId::zero(), last_processed_id: StreamId::ZERO,
max_stream_id: StreamId::MAX,
pending_accept: store::Queue::new(), pending_accept: store::Queue::new(),
pending_reset_expired: store::Queue::new(), pending_reset_expired: store::Queue::new(),
reset_duration: config.local_reset_duration, reset_duration: config.local_reset_duration,
@@ -606,12 +616,25 @@ impl Recv {
stream.notify_recv(); stream.notify_recv();
} }
pub fn go_away(&mut self, last_processed_id: StreamId) {
assert!(self.max_stream_id >= last_processed_id);
self.max_stream_id = last_processed_id;
}
pub fn recv_eof(&mut self, stream: &mut Stream) { pub fn recv_eof(&mut self, stream: &mut Stream) {
stream.state.recv_eof(); stream.state.recv_eof();
stream.notify_send(); stream.notify_send();
stream.notify_recv(); stream.notify_recv();
} }
/// Get the max ID of streams we can receive.
///
/// This gets lowered if we send a GOAWAY frame.
pub fn max_stream_id(&self) -> StreamId {
self.max_stream_id
}
fn next_stream_id(&self) -> Result<StreamId, RecvError> { fn next_stream_id(&self) -> Result<StreamId, RecvError> {
if let Ok(id) = self.next_stream_id { if let Ok(id) = self.next_stream_id {
Ok(id) Ok(id)

View File

@@ -127,6 +127,11 @@ where
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
if id > me.actions.recv.max_stream_id() {
trace!("id ({:?}) > max_stream_id ({:?}), ignoring HEADERS", id, me.actions.recv.max_stream_id());
return Ok(());
}
let key = match me.store.find_entry(id) { let key = match me.store.find_entry(id) {
Entry::Occupied(e) => e.key(), Entry::Occupied(e) => e.key(),
Entry::Vacant(e) => match me.actions.recv.open(id, &mut me.counts)? { Entry::Vacant(e) => match me.actions.recv.open(id, &mut me.counts)? {
@@ -209,6 +214,11 @@ where
let stream = match me.store.find_mut(&id) { let stream = match me.store.find_mut(&id) {
Some(stream) => stream, Some(stream) => stream,
None => { None => {
if id > me.actions.recv.max_stream_id() {
trace!("id ({:?}) > max_stream_id ({:?}), ignoring DATA", id, me.actions.recv.max_stream_id());
return Ok(());
}
trace!("recv_data; stream not found: {:?}", id); trace!("recv_data; stream not found: {:?}", id);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
}, },
@@ -243,6 +253,11 @@ where
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
} }
if id > me.actions.recv.max_stream_id() {
trace!("id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM", id, me.actions.recv.max_stream_id());
return Ok(());
}
let stream = match me.store.find_mut(&id) { let stream = match me.store.find_mut(&id) {
Some(stream) => stream, Some(stream) => stream,
None => { None => {
@@ -295,7 +310,7 @@ where
last_processed_id last_processed_id
} }
pub fn recv_goaway(&mut self, frame: &frame::GoAway) { pub fn recv_go_away(&mut self, frame: &frame::GoAway) {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
@@ -307,6 +322,8 @@ where
let last_stream_id = frame.last_stream_id(); let last_stream_id = frame.last_stream_id();
let err = frame.reason().into(); let err = frame.reason().into();
actions.recv.go_away(last_stream_id);
me.store me.store
.for_each(|stream| if stream.id > last_stream_id { .for_each(|stream| if stream.id > last_stream_id {
counts.transition(stream, |_, stream| { counts.transition(stream, |_, stream| {
@@ -631,6 +648,13 @@ where
actions.recv.enqueue_reset_expiration(stream, counts) actions.recv.enqueue_reset_expiration(stream, counts)
}) })
} }
pub fn send_go_away(&mut self, last_processed_id: StreamId) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let actions = &mut me.actions;
actions.recv.go_away(last_processed_id);
}
} }
impl<B> Streams<B, client::Peer> impl<B> Streams<B, client::Peer>

View File

@@ -427,12 +427,40 @@ where
self.connection.poll().map_err(Into::into) self.connection.poll().map_err(Into::into)
} }
/// Sets the connection to a GOAWAY state. Does not close connection immediately. #[deprecated(note="use abrupt_shutdown or graceful_shutdown instead", since="0.1.4")]
/// #[doc(hidden)]
/// This closes the stream after sending a GOAWAY frame
/// and flushing the codec. Must continue being polled to close connection.
pub fn close_connection(&mut self) { pub fn close_connection(&mut self) {
self.connection.close_connection(); self.graceful_shutdown();
}
/// Sets the connection to a GOAWAY state.
///
/// Does not terminate the connection. Must continue being polled to close
/// connection.
///
/// After flushing the GOAWAY frame, the connection is closed. Any
/// outstanding streams do not prevent the connection from closing. This
/// should usually be reserved for shutting down when something bad
/// external to `h2` has happened, and open streams cannot be properly
/// handled.
///
/// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
pub fn abrupt_shutdown(&mut self, reason: Reason) {
self.connection.go_away_now(reason);
}
/// Starts a [graceful shutdown][1] process.
///
/// Must continue being polled to close connection.
///
/// It's possible to receive more requests after calling this method, since
/// they might have been in-flight from the client already. After about
/// 1 RTT, no new requests should be accepted. Once all active streams
/// have completed, the connection is closed.
///
/// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
pub fn graceful_shutdown(&mut self) {
self.connection.go_away_gracefully();
} }
} }

View File

@@ -1,3 +1,4 @@
#![deny(warnings)]
pub mod support; pub mod support;
use support::prelude::*; use support::prelude::*;
@@ -205,7 +206,7 @@ fn sends_reset_cancel_when_req_body_is_dropped() {
} }
#[test] #[test]
fn sends_goaway_when_serv_closes_connection() { fn abrupt_shutdown() {
let _ = ::env_logger::try_init(); let _ = ::env_logger::try_init();
let (io, client) = mock::new(); let (io, client) = mock::new();
@@ -222,7 +223,7 @@ fn sends_goaway_when_serv_closes_connection() {
let srv = server::handshake(io).expect("handshake").and_then(|srv| { let srv = server::handshake(io).expect("handshake").and_then(|srv| {
srv.into_future().unwrap().and_then(|(_, mut srv)| { srv.into_future().unwrap().and_then(|(_, mut srv)| {
srv.close_connection(); srv.abrupt_shutdown(Reason::NO_ERROR);
srv.into_future().unwrap() srv.into_future().unwrap()
}) })
}); });
@@ -231,7 +232,7 @@ fn sends_goaway_when_serv_closes_connection() {
} }
#[test] #[test]
fn serve_request_then_serv_closes_connection() { fn graceful_shutdown() {
let _ = ::env_logger::try_init(); let _ = ::env_logger::try_init();
let (io, client) = mock::new(); let (io, client) = mock::new();
@@ -241,37 +242,74 @@ fn serve_request_then_serv_closes_connection() {
.recv_settings() .recv_settings()
.send_frame( .send_frame(
frames::headers(1) frames::headers(1)
.request("GET", "https://example.com/"), .request("GET", "https://example.com/")
.eos(),
) )
.recv_frame(frames::go_away(StreamId::MAX_CLIENT))
.recv_frame(frames::ping(frame::Ping::SHUTDOWN))
.recv_frame(frames::headers(1).response(200).eos()) .recv_frame(frames::headers(1).response(200).eos())
.recv_frame(frames::reset(1).cancel()) // Pretend this stream was sent while the GOAWAY was in flight
.send_frame( .send_frame(
frames::headers(3) frames::headers(3)
.request("GET", "https://example.com/"), .request("POST", "https://example.com/"),
) )
.send_frame(frames::ping(frame::Ping::SHUTDOWN).pong())
.recv_frame(frames::go_away(3)) .recv_frame(frames::go_away(3))
// streams sent after GOAWAY receive no response // streams sent after GOAWAY receive no response
.send_frame( .send_frame(
frames::headers(5) frames::headers(7)
.request("GET", "https://example.com/"), .request("GET", "https://example.com/"),
) )
.close(); .send_frame(frames::data(7, "").eos())
.send_frame(frames::data(3, "").eos())
.recv_frame(frames::headers(3).response(200).eos())
.close(); //TODO: closed()?
let srv = server::handshake(io).expect("handshake").and_then(|srv| { let srv = server::handshake(io)
srv.into_future().unwrap().and_then(|(reqstream, srv)| { .expect("handshake")
.and_then(|srv| {
srv.into_future().unwrap()
})
.and_then(|(reqstream, mut srv)| {
let (req, mut stream) = reqstream.unwrap(); let (req, mut stream) = reqstream.unwrap();
assert_eq!(req.method(), &http::Method::GET); assert_eq!(req.method(), &http::Method::GET);
let rsp = http::Response::builder().status(200).body(()).unwrap(); srv.graceful_shutdown();
let rsp = http::Response::builder()
.status(200)
.body(())
.unwrap();
stream.send_response(rsp, true).unwrap(); stream.send_response(rsp, true).unwrap();
srv.into_future().unwrap().and_then(|(_reqstream, mut srv)| { srv.into_future().unwrap()
srv.close_connection();
srv.into_future().unwrap()
})
}) })
}); .and_then(|(reqstream, srv)| {
let (req, mut stream) = reqstream.unwrap();
assert_eq!(req.method(), &http::Method::POST);
let body = req.into_parts().1;
let body = body.concat2().and_then(move |buf| {
assert!(buf.is_empty());
let rsp = http::Response::builder()
.status(200)
.body(())
.unwrap();
stream.send_response(rsp, true).unwrap();
Ok(())
});
srv.into_future()
.map(|(req, _srv)| {
assert!(req.is_none(), "unexpected request");
})
.drive(body)
.and_then(|(srv, ())| {
srv.expect("srv")
})
});
srv.join(client).wait().expect("wait"); srv.join(client).wait().expect("wait");
} }