diff --git a/src/codec/error.rs b/src/codec/error.rs index 74063c8..e89d5a1 100644 --- a/src/codec/error.rs +++ b/src/codec/error.rs @@ -40,6 +40,7 @@ pub enum UserError { /// The released capacity is larger than claimed capacity. ReleaseCapacityTooBig, + /// The stream ID space is overflowed. /// /// A new connection is needed. diff --git a/src/frame/ping.rs b/src/frame/ping.rs index af4742a..7f2c1ba 100644 --- a/src/frame/ping.rs +++ b/src/frame/ping.rs @@ -11,8 +11,18 @@ pub struct Ping { payload: Payload, } +// This was just 8 randomly generated bytes. We use something besides just +// zeroes to distinguish this specific PING from any other. +const SHUTDOWN_PAYLOAD: Payload = [0x0b, 0x7b, 0xa2, 0xf0, 0x8b, 0x9b, 0xfe, 0x54]; + impl Ping { + #[cfg(feature = "unstable")] + pub const SHUTDOWN: Payload = SHUTDOWN_PAYLOAD; + + #[cfg(not(feature = "unstable"))] + pub(crate) const SHUTDOWN: Payload = SHUTDOWN_PAYLOAD; + pub fn new(payload: Payload) -> Ping { Ping { ack: false, @@ -31,7 +41,6 @@ impl Ping { self.ack } - #[cfg(feature = "unstable")] pub fn payload(&self) -> &Payload { &self.payload } diff --git a/src/frame/stream_id.rs b/src/frame/stream_id.rs index 7406e70..17ae597 100644 --- a/src/frame/stream_id.rs +++ b/src/frame/stream_id.rs @@ -14,6 +14,8 @@ impl StreamId { pub const MAX: StreamId = StreamId(u32::MAX >> 1); + pub const MAX_CLIENT: StreamId = StreamId((u32::MAX >> 1) - 1); + /// Parse the stream ID #[inline] pub fn parse(buf: &[u8]) -> (StreamId, bool) { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index c6d241e..c66c003 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -10,6 +10,7 @@ use futures::Stream; use tokio_io::{AsyncRead, AsyncWrite}; use std::marker::PhantomData; +use std::io; use std::time::Duration; /// An H2 connection @@ -30,6 +31,9 @@ where /// Read / write frame values codec: Codec>, + /// Pending GOAWAY frames to write. + go_away: GoAway, + /// Ping/pong handler ping_pong: PingPong, @@ -57,11 +61,8 @@ enum State { /// Currently open in a sane state Open, - /// Waiting to send a GOAWAY frame - GoAway(frame::GoAway), - /// The codec must be flushed - Flush(Reason), + Closing(Reason), /// In a closed state Closed(Reason), @@ -95,6 +96,7 @@ where state: State::Open, error: None, codec: codec, + go_away: GoAway::new(), ping_pong: PingPong::new(), settings: Settings::new(), streams: streams, @@ -111,9 +113,9 @@ where /// Returns `RecvError` as this may raise errors that are caused by delayed /// processing of received frames. fn poll_ready(&mut self) -> Poll<(), RecvError> { - // The order of these calls don't really matter too much as only one - // should have pending work. + // The order of these calls don't really matter too much try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); + try_ready!(self.ping_pong.send_pending_ping(&mut self.codec)); try_ready!( self.settings .send_pending_ack(&mut self.codec, &mut self.streams) @@ -123,9 +125,47 @@ where Ok(().into()) } - fn transition_to_go_away(&mut self, id: StreamId, e: Reason) { - let goaway = frame::GoAway::new(id, e); - self.state = State::GoAway(goaway); + /// Send any pending GOAWAY frames. + /// + /// This will return `Some(reason)` if the connection should be closed + /// afterwards. If this is a graceful shutdown, this returns `None`. + fn poll_go_away(&mut self) -> Poll, io::Error> { + self.go_away.send_pending_go_away(&mut self.codec) + } + + fn go_away(&mut self, id: StreamId, e: Reason) { + let frame = frame::GoAway::new(id, e); + self.streams.send_go_away(id); + self.go_away.go_away(frame); + } + + pub fn go_away_now(&mut self, e: Reason) { + let last_processed_id = self.streams.last_processed_id(); + let frame = frame::GoAway::new(last_processed_id, e); + self.go_away.go_away_now(frame); + } + + fn take_error(&mut self, ours: Reason) -> Poll<(), proto::Error> { + let reason = if let Some(theirs) = self.error.take() { + match (ours, theirs) { + // If either side reported an error, return that + // to the user. + (Reason::NO_ERROR, err) | (err, Reason::NO_ERROR) => err, + // If both sides reported an error, give their + // error back to th user. We assume our error + // was a consequence of their error, and less + // important. + (_, theirs) => theirs, + } + } else { + ours + }; + + if reason == Reason::NO_ERROR { + Ok(().into()) + } else { + Err(proto::Error::Proto(reason)) + } } /// Closes the connection by transitioning to a GOAWAY state @@ -134,15 +174,10 @@ where // If we poll() and realize that there are no streams or references // then we can close the connection by transitioning to GOAWAY if self.streams.num_active_streams() == 0 && !self.streams.has_streams_or_other_references() { - self.close_connection(); + self.go_away_now(Reason::NO_ERROR); } } - /// Closes the connection by transitioning to a GOAWAY state - pub fn close_connection(&mut self) { - let last_processed_id = self.streams.last_processed_id(); - self.transition_to_go_away(last_processed_id, Reason::NO_ERROR); - } /// Advances the internal state of the connection. pub fn poll(&mut self) -> Poll<(), proto::Error> { @@ -155,7 +190,7 @@ where State::Open => { match self.poll2() { // The connection has shutdown normally - Ok(Async::Ready(())) => return Ok(().into()), + Ok(Async::Ready(())) => return self.take_error(Reason::NO_ERROR), // The connection is not ready to make progress Ok(Async::NotReady) => { // Ensure all window updates have been sent. @@ -163,10 +198,9 @@ where // This will also handle flushing `self.codec` try_ready!(self.streams.poll_complete(&mut self.codec)); - if self.error.is_some() { + if self.error.is_some() || self.go_away.should_close_on_idle() { if self.streams.num_active_streams() == 0 { - let last_processed_id = self.streams.last_processed_id(); - self.transition_to_go_away(last_processed_id, Reason::NO_ERROR); + self.go_away_now(Reason::NO_ERROR); continue; } } @@ -179,9 +213,19 @@ where Err(Connection(e)) => { debug!("Connection::poll; err={:?}", e); + // We may have already sent a GOAWAY for this error, + // if so, don't send another, just flush and close up. + if let Some(reason) = self.go_away.going_away_reason() { + if reason == e { + trace!(" -> already going away"); + self.state = State::Closing(e); + continue; + } + } + // Reset all active streams - let last_processed_id = self.streams.recv_err(&e.into()); - self.transition_to_go_away(last_processed_id, e); + self.streams.recv_err(&e.into()); + self.go_away_now(e); }, // Attempting to read a frame resulted in a stream level error. // This is handled by resetting the frame then trying to read @@ -207,48 +251,16 @@ where return Err(e); }, } - }, - State::GoAway(frame) => { - // Ensure the codec is ready to accept the frame - try_ready!(self.codec.poll_ready()); - - // Buffer the GOAWAY frame - self.codec - .buffer(frame.into()) - .ok() - .expect("invalid GO_AWAY frame"); - - // GOAWAY sent, transition the connection to a closed state - // Determine what error code should be returned to user. - let reason = if let Some(theirs) = self.error.take() { - let ours = frame.reason(); - match (ours, theirs) { - // If either side reported an error, return that - // to the user. - (Reason::NO_ERROR, err) | (err, Reason::NO_ERROR) => err, - // If both sides reported an error, give their - // error back to th user. We assume our error - // was a consequence of their error, and less - // important. - (_, theirs) => theirs, - } - } else { - frame.reason() - }; - self.state = State::Flush(reason); - }, - State::Flush(reason) => { + } + State::Closing(reason) => { + trace!("connection closing after flush, reason={:?}", reason); // Flush the codec try_ready!(self.codec.flush()); // Transition the state to error self.state = State::Closed(reason); }, - State::Closed(reason) => if let Reason::NO_ERROR = reason { - return Ok(Async::Ready(())); - } else { - return Err(reason.into()); - }, + State::Closed(reason) => return self.take_error(reason), } } } @@ -263,6 +275,17 @@ where loop { // First, ensure that the `Connection` is able to receive a frame + // + // The order here matters: + // - poll_go_away may buffer a graceful shutdown GOAWAY frame + // - If it has, we've also added a PING to be sent in poll_ready + if let Some(reason) = try_ready!(self.poll_go_away()) { + if self.go_away.should_close_now() { + return Err(RecvError::Connection(reason)); + } + // Only NO_ERROR should be waiting for idle + debug_assert_eq!(reason, Reason::NO_ERROR, "graceful GOAWAY should be NO_ERROR"); + } try_ready!(self.poll_ready()); match try_ready!(self.codec.poll()) { @@ -292,12 +315,21 @@ where // but should allow continuing to process current streams // until they are all EOS. Once they are, State should // transition to GoAway. - self.streams.recv_goaway(&frame); + self.streams.recv_go_away(&frame); self.error = Some(frame.reason()); }, Some(Ping(frame)) => { trace!("recv PING; frame={:?}", frame); - self.ping_pong.recv_ping(frame); + let status = self.ping_pong.recv_ping(frame); + if status.is_shutdown() { + assert!( + self.go_away.is_going_away(), + "received unexpected shutdown ping" + ); + + let last_processed_id = self.streams.last_processed_id(); + self.go_away(last_processed_id, Reason::NO_ERROR); + } }, Some(WindowUpdate(frame)) => { trace!("recv WINDOW_UPDATE; frame={:?}", frame); @@ -339,4 +371,29 @@ where pub fn next_incoming(&mut self) -> Option> { self.streams.next_incoming() } + + // Graceful shutdown only makes sense for server peers. + pub fn go_away_gracefully(&mut self) { + if self.go_away.is_going_away() { + // No reason to start a new one. + return; + } + + // According to http://httpwg.org/specs/rfc7540.html#GOAWAY: + // + // > A server that is attempting to gracefully shut down a connection + // > SHOULD send an initial GOAWAY frame with the last stream + // > identifier set to 231-1 and a NO_ERROR code. This signals to the + // > client that a shutdown is imminent and that initiating further + // > requests is prohibited. After allowing time for any in-flight + // > stream creation (at least one round-trip time), the server can + // > send another GOAWAY frame with an updated last stream identifier. + // > This ensures that a connection can be cleanly shut down without + // > losing requests. + self.go_away(StreamId::MAX_CLIENT, Reason::NO_ERROR); + + // We take the advice of waiting 1 RTT literally, and wait + // for a pong before proceeding. + self.ping_pong.ping_shutdown(); + } } diff --git a/src/proto/go_away.rs b/src/proto/go_away.rs new file mode 100644 index 0000000..ebcb6c6 --- /dev/null +++ b/src/proto/go_away.rs @@ -0,0 +1,134 @@ +use codec::Codec; +use frame::{self, Reason, StreamId}; + +use bytes::Buf; +use futures::{Async, Poll}; +use std::io; +use tokio_io::AsyncWrite; + +/// Manages our sending of GOAWAY frames. +#[derive(Debug)] +pub(super) struct GoAway { + /// Whether the connection should close now, or wait until idle. + close_now: bool, + /// Records if we've sent any GOAWAY before. + going_away: Option, + + /// A GOAWAY frame that must be buffered in the Codec immediately. + pending: Option, +} + +/// Keeps a memory of any GOAWAY frames we've sent before. +/// +/// This looks very similar to a `frame::GoAway`, but is a separate type. Why? +/// Mostly for documentation purposes. This type is to record status. If it +/// were a `frame::GoAway`, it might appear like we eventually wanted to +/// serialize it. We **only** want to be able to look up these fields at a +/// later time. +/// +/// (Technically, `frame::GoAway` should gain an opaque_debug_data field as +/// well, and we wouldn't want to save that here to accidentally dump in logs, +/// or waste struct space.) +#[derive(Debug)] +struct GoingAway { + /// Stores the highest stream ID of a GOAWAY that has been sent. + /// + /// It's illegal to send a subsequent GOAWAY with a higher ID. + last_processed_id: StreamId, + + /// Records the error code of any GOAWAY frame sent. + reason: Reason, +} + +impl GoAway { + pub fn new() -> Self { + GoAway { + close_now: false, + going_away: None, + pending: None, + } + } + + /// Enqueue a GOAWAY frame to be written. + /// + /// The connection is expected to continue to run until idle. + pub fn go_away(&mut self, f: frame::GoAway) { + if let Some(ref going_away) = self.going_away { + assert!( + f.last_stream_id() <= going_away.last_processed_id, + "GOAWAY stream IDs shouldn't be higher; \ + last_processed_id = {:?}, f.last_stream_id() = {:?}", + going_away.last_processed_id, + f.last_stream_id(), + ); + } + + self.going_away = Some(GoingAway { + last_processed_id: f.last_stream_id(), + reason: f.reason(), + }); + self.pending = Some(f); + } + + pub fn go_away_now(&mut self, f: frame::GoAway) { + self.close_now = true; + if let Some(ref going_away) = self.going_away { + // Prevent sending the same GOAWAY twice. + if going_away.last_processed_id == f.last_stream_id() + && going_away.reason == f.reason() { + return; + } + } + self.go_away(f); + } + + /// Return if a GOAWAY has ever been scheduled. + pub fn is_going_away(&self) -> bool { + self.going_away.is_some() + } + + /// Return the last Reason we've sent. + pub fn going_away_reason(&self) -> Option { + self.going_away + .as_ref() + .map(|g| g.reason) + } + + /// Returns if the connection should close now, or wait until idle. + pub fn should_close_now(&self) -> bool { + self.pending.is_none() && self.close_now + } + + /// Returns if the connection should be closed when idle. + pub fn should_close_on_idle(&self) -> bool { + !self.close_now && self.going_away + .as_ref() + .map(|g| g.last_processed_id != StreamId::MAX_CLIENT) + .unwrap_or(false) + } + + /// Try to write a pending GOAWAY frame to the buffer. + /// + /// If a frame is written, the `Reason` of the GOAWAY is returned. + pub fn send_pending_go_away(&mut self, dst: &mut Codec) -> Poll, io::Error> + where + T: AsyncWrite, + B: Buf, + { + if let Some(frame) = self.pending.take() { + if !dst.poll_ready()?.is_ready() { + self.pending = Some(frame); + return Ok(Async::NotReady); + } + + let reason = frame.reason(); + dst.buffer(frame.into()) + .ok() + .expect("invalid GOAWAY frame"); + + return Ok(Async::Ready(Some(reason))); + } + + Ok(Async::Ready(None)) + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index e1aec8d..f41afe7 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,5 +1,6 @@ mod connection; mod error; +mod go_away; mod peer; mod ping_pong; mod settings; @@ -13,6 +14,7 @@ pub(crate) use self::streams::Prioritized; use codec::Codec; +use self::go_away::GoAway; use self::ping_pong::PingPong; use self::settings::Settings; diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index 21ce188..8cce222 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -10,25 +10,67 @@ use tokio_io::AsyncWrite; /// Acknowledges ping requests from the remote. #[derive(Debug)] pub struct PingPong { - sending_pong: Option, + pending_ping: Option, + pending_pong: Option, +} + +#[derive(Debug)] +struct PendingPing { + payload: PingPayload, + sent: bool, +} + +/// Status returned from `PingPong::recv_ping`. +#[derive(Debug)] +pub(crate) enum ReceivedPing { + MustAck, + Unknown, + Shutdown, } impl PingPong { pub fn new() -> Self { PingPong { - sending_pong: None, + pending_ping: None, + pending_pong: None, } } + pub fn ping_shutdown(&mut self) { + assert!(self.pending_ping.is_none()); + + self.pending_ping = Some(PendingPing { + payload: Ping::SHUTDOWN, + sent: false, + }); + } + /// Process a ping - pub fn recv_ping(&mut self, ping: Ping) { + pub(crate) fn recv_ping(&mut self, ping: Ping) -> ReceivedPing { // The caller should always check that `send_pongs` returns ready before // calling `recv_ping`. - assert!(self.sending_pong.is_none()); + assert!(self.pending_pong.is_none()); - if !ping.is_ack() { + if ping.is_ack() { + if let Some(pending) = self.pending_ping.take() { + if &pending.payload == ping.payload() { + trace!("recv PING ack"); + return ReceivedPing::Shutdown; + } + + // if not the payload we expected, put it back. + self.pending_ping = Some(pending); + } + + // else we were acked a ping we didn't send? + // The spec doesn't require us to do anything about this, + // so for resiliency, just ignore it for now. + warn!("recv PING ack that we never sent: {:?}", ping); + ReceivedPing::Unknown + } else { // Save the ping's payload to be sent as an acknowledgement. - self.sending_pong = Some(ping.into_payload()); + self.pending_pong = Some(ping.into_payload()); + ReceivedPing::MustAck } } @@ -38,15 +80,46 @@ impl PingPong { T: AsyncWrite, B: Buf, { - if let Some(pong) = self.sending_pong.take() { + if let Some(pong) = self.pending_pong.take() { if !dst.poll_ready()?.is_ready() { - self.sending_pong = Some(pong); + self.pending_pong = Some(pong); return Ok(Async::NotReady); } - dst.buffer(Ping::pong(pong).into()).ok().expect("invalid pong frame"); + dst.buffer(Ping::pong(pong).into()) + .expect("invalid pong frame"); + } + + Ok(Async::Ready(())) + } + + /// Send any pending pings. + pub fn send_pending_ping(&mut self, dst: &mut Codec) -> Poll<(), io::Error> + where + T: AsyncWrite, + B: Buf, + { + if let Some(ref mut ping) = self.pending_ping { + if !ping.sent { + if !dst.poll_ready()?.is_ready() { + return Ok(Async::NotReady); + } + + dst.buffer(Ping::new(ping.payload).into()) + .expect("invalid ping frame"); + ping.sent = true; + } } Ok(Async::Ready(())) } } + +impl ReceivedPing { + pub fn is_shutdown(&self) -> bool { + match *self { + ReceivedPing::Shutdown => true, + _ => false, + } + } +} diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 6e2d524..e27ec10 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -26,6 +26,15 @@ pub(super) struct Recv { /// The stream ID of the last processed stream last_processed_id: StreamId, + /// Any streams with a higher ID are ignored. + /// + /// This starts as MAX, but is lowered when a GOAWAY is received. + /// + /// > After sending a GOAWAY frame, the sender can discard frames for + /// > streams initiated by the receiver with identifiers higher than + /// > the identified last stream. + max_stream_id: StreamId, + /// Streams that have pending window updates pending_window_updates: store::Queue, @@ -85,7 +94,8 @@ impl Recv { in_flight_data: 0 as WindowSize, next_stream_id: Ok(next_stream_id.into()), pending_window_updates: store::Queue::new(), - last_processed_id: StreamId::zero(), + last_processed_id: StreamId::ZERO, + max_stream_id: StreamId::MAX, pending_accept: store::Queue::new(), pending_reset_expired: store::Queue::new(), reset_duration: config.local_reset_duration, @@ -606,12 +616,25 @@ impl Recv { stream.notify_recv(); } + pub fn go_away(&mut self, last_processed_id: StreamId) { + assert!(self.max_stream_id >= last_processed_id); + + self.max_stream_id = last_processed_id; + } + pub fn recv_eof(&mut self, stream: &mut Stream) { stream.state.recv_eof(); stream.notify_send(); stream.notify_recv(); } + /// Get the max ID of streams we can receive. + /// + /// This gets lowered if we send a GOAWAY frame. + pub fn max_stream_id(&self) -> StreamId { + self.max_stream_id + } + fn next_stream_id(&self) -> Result { if let Ok(id) = self.next_stream_id { Ok(id) diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 00970e2..d18eaaf 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -127,6 +127,11 @@ where let mut me = self.inner.lock().unwrap(); let me = &mut *me; + if id > me.actions.recv.max_stream_id() { + trace!("id ({:?}) > max_stream_id ({:?}), ignoring HEADERS", id, me.actions.recv.max_stream_id()); + return Ok(()); + } + let key = match me.store.find_entry(id) { Entry::Occupied(e) => e.key(), Entry::Vacant(e) => match me.actions.recv.open(id, &mut me.counts)? { @@ -209,6 +214,11 @@ where let stream = match me.store.find_mut(&id) { Some(stream) => stream, None => { + if id > me.actions.recv.max_stream_id() { + trace!("id ({:?}) > max_stream_id ({:?}), ignoring DATA", id, me.actions.recv.max_stream_id()); + return Ok(()); + } + trace!("recv_data; stream not found: {:?}", id); return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); }, @@ -243,6 +253,11 @@ where return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); } + if id > me.actions.recv.max_stream_id() { + trace!("id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM", id, me.actions.recv.max_stream_id()); + return Ok(()); + } + let stream = match me.store.find_mut(&id) { Some(stream) => stream, None => { @@ -295,7 +310,7 @@ where last_processed_id } - pub fn recv_goaway(&mut self, frame: &frame::GoAway) { + pub fn recv_go_away(&mut self, frame: &frame::GoAway) { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -307,6 +322,8 @@ where let last_stream_id = frame.last_stream_id(); let err = frame.reason().into(); + actions.recv.go_away(last_stream_id); + me.store .for_each(|stream| if stream.id > last_stream_id { counts.transition(stream, |_, stream| { @@ -631,6 +648,13 @@ where actions.recv.enqueue_reset_expiration(stream, counts) }) } + + pub fn send_go_away(&mut self, last_processed_id: StreamId) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + let actions = &mut me.actions; + actions.recv.go_away(last_processed_id); + } } impl Streams diff --git a/src/server.rs b/src/server.rs index 9a0e718..f2d7049 100644 --- a/src/server.rs +++ b/src/server.rs @@ -427,12 +427,40 @@ where self.connection.poll().map_err(Into::into) } - /// Sets the connection to a GOAWAY state. Does not close connection immediately. - /// - /// This closes the stream after sending a GOAWAY frame - /// and flushing the codec. Must continue being polled to close connection. + #[deprecated(note="use abrupt_shutdown or graceful_shutdown instead", since="0.1.4")] + #[doc(hidden)] pub fn close_connection(&mut self) { - self.connection.close_connection(); + self.graceful_shutdown(); + } + + /// Sets the connection to a GOAWAY state. + /// + /// Does not terminate the connection. Must continue being polled to close + /// connection. + /// + /// After flushing the GOAWAY frame, the connection is closed. Any + /// outstanding streams do not prevent the connection from closing. This + /// should usually be reserved for shutting down when something bad + /// external to `h2` has happened, and open streams cannot be properly + /// handled. + /// + /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown). + pub fn abrupt_shutdown(&mut self, reason: Reason) { + self.connection.go_away_now(reason); + } + + /// Starts a [graceful shutdown][1] process. + /// + /// Must continue being polled to close connection. + /// + /// It's possible to receive more requests after calling this method, since + /// they might have been in-flight from the client already. After about + /// 1 RTT, no new requests should be accepted. Once all active streams + /// have completed, the connection is closed. + /// + /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY + pub fn graceful_shutdown(&mut self) { + self.connection.go_away_gracefully(); } } diff --git a/tests/server.rs b/tests/server.rs index 18b4998..47e96e0 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -1,3 +1,4 @@ +#![deny(warnings)] pub mod support; use support::prelude::*; @@ -205,7 +206,7 @@ fn sends_reset_cancel_when_req_body_is_dropped() { } #[test] -fn sends_goaway_when_serv_closes_connection() { +fn abrupt_shutdown() { let _ = ::env_logger::try_init(); let (io, client) = mock::new(); @@ -222,7 +223,7 @@ fn sends_goaway_when_serv_closes_connection() { let srv = server::handshake(io).expect("handshake").and_then(|srv| { srv.into_future().unwrap().and_then(|(_, mut srv)| { - srv.close_connection(); + srv.abrupt_shutdown(Reason::NO_ERROR); srv.into_future().unwrap() }) }); @@ -231,7 +232,7 @@ fn sends_goaway_when_serv_closes_connection() { } #[test] -fn serve_request_then_serv_closes_connection() { +fn graceful_shutdown() { let _ = ::env_logger::try_init(); let (io, client) = mock::new(); @@ -241,37 +242,74 @@ fn serve_request_then_serv_closes_connection() { .recv_settings() .send_frame( frames::headers(1) - .request("GET", "https://example.com/"), + .request("GET", "https://example.com/") + .eos(), ) + .recv_frame(frames::go_away(StreamId::MAX_CLIENT)) + .recv_frame(frames::ping(frame::Ping::SHUTDOWN)) .recv_frame(frames::headers(1).response(200).eos()) - .recv_frame(frames::reset(1).cancel()) + // Pretend this stream was sent while the GOAWAY was in flight .send_frame( frames::headers(3) - .request("GET", "https://example.com/"), + .request("POST", "https://example.com/"), ) + .send_frame(frames::ping(frame::Ping::SHUTDOWN).pong()) .recv_frame(frames::go_away(3)) // streams sent after GOAWAY receive no response .send_frame( - frames::headers(5) + frames::headers(7) .request("GET", "https://example.com/"), ) - .close(); + .send_frame(frames::data(7, "").eos()) + .send_frame(frames::data(3, "").eos()) + .recv_frame(frames::headers(3).response(200).eos()) + .close(); //TODO: closed()? - let srv = server::handshake(io).expect("handshake").and_then(|srv| { - srv.into_future().unwrap().and_then(|(reqstream, srv)| { + let srv = server::handshake(io) + .expect("handshake") + .and_then(|srv| { + srv.into_future().unwrap() + }) + .and_then(|(reqstream, mut srv)| { let (req, mut stream) = reqstream.unwrap(); assert_eq!(req.method(), &http::Method::GET); - let rsp = http::Response::builder().status(200).body(()).unwrap(); + srv.graceful_shutdown(); + + let rsp = http::Response::builder() + .status(200) + .body(()) + .unwrap(); stream.send_response(rsp, true).unwrap(); - srv.into_future().unwrap().and_then(|(_reqstream, mut srv)| { - srv.close_connection(); - srv.into_future().unwrap() - }) + srv.into_future().unwrap() }) - }); + .and_then(|(reqstream, srv)| { + let (req, mut stream) = reqstream.unwrap(); + assert_eq!(req.method(), &http::Method::POST); + let body = req.into_parts().1; + + let body = body.concat2().and_then(move |buf| { + assert!(buf.is_empty()); + + let rsp = http::Response::builder() + .status(200) + .body(()) + .unwrap(); + stream.send_response(rsp, true).unwrap(); + Ok(()) + }); + + srv.into_future() + .map(|(req, _srv)| { + assert!(req.is_none(), "unexpected request"); + }) + .drive(body) + .and_then(|(srv, ())| { + srv.expect("srv") + }) + }); srv.join(client).wait().expect("wait"); }