From d6dc63276f0bd8b79ad4fc463f945e50dd32b90c Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 24 Mar 2020 17:36:13 -0700 Subject: [PATCH] Fix receiving a GOAWAY frame from updating the max recv ID Receiving a GOAWAY should update the max *send* ID, it shouldn't affect the max recv. --- src/proto/streams/send.rs | 30 ++++++++++++++++ src/proto/streams/streams.rs | 20 ++--------- tests/h2-tests/tests/server.rs | 49 +++++++++++++++++++++++++++ tests/h2-tests/tests/stream_states.rs | 34 +++++++++++++++++++ 4 files changed, 116 insertions(+), 17 deletions(-) diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index a1199b2..4d38593 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -18,6 +18,15 @@ pub(super) struct Send { /// Stream identifier to use for next initialized stream. next_stream_id: Result, + /// Any streams with a higher ID are ignored. + /// + /// This starts as MAX, but is lowered when a GOAWAY is received. + /// + /// > After sending a GOAWAY frame, the sender can discard frames for + /// > streams initiated by the receiver with identifiers higher than + /// > the identified last stream. + max_stream_id: StreamId, + /// Initial window size of locally initiated streams init_window_sz: WindowSize, @@ -37,6 +46,7 @@ impl Send { pub fn new(config: &Config) -> Self { Send { init_window_sz: config.remote_init_window_sz, + max_stream_id: StreamId::MAX, next_stream_id: Ok(config.local_next_stream_id), prioritize: Prioritize::new(config), } @@ -370,6 +380,26 @@ impl Send { Ok(()) } + pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), RecvError> { + if last_stream_id > self.max_stream_id { + // The remote endpoint sent a `GOAWAY` frame indicating a stream + // that we never sent, or that we have already terminated on account + // of previous `GOAWAY` frame. In either case, that is illegal. + // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase + // the value they send in the last stream identifier, since the + // peers might already have retried unprocessed requests on another + // connection.") + proto_err!(conn: + "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})", + last_stream_id, self.max_stream_id, + ); + return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + } + + self.max_stream_id = last_stream_id; + Ok(()) + } + pub fn recv_err( &mut self, buffer: &mut Buffer>, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 527969b..8f61861 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -392,25 +392,11 @@ where let send_buffer = &mut *send_buffer; let last_stream_id = frame.last_stream_id(); + + actions.send.recv_go_away(last_stream_id)?; + let err = frame.reason().into(); - if last_stream_id > actions.recv.max_stream_id() { - // The remote endpoint sent a `GOAWAY` frame indicating a stream - // that we never sent, or that we have already terminated on account - // of previous `GOAWAY` frame. In either case, that is illegal. - // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase - // the value they send in the last stream identifier, since the - // peers might already have retried unprocessed requests on another - // connection.") - proto_err!(conn: - "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})", - last_stream_id, actions.recv.max_stream_id(), - ); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); - } - - actions.recv.go_away(last_stream_id); - me.store .for_each(|stream| { if stream.id > last_stream_id { diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index 17a2801..1916138 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -656,6 +656,55 @@ async fn graceful_shutdown() { join(client, srv).await; } +#[tokio::test] +async fn goaway_even_if_client_sent_goaway() { + let _ = env_logger::try_init(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + client + .send_frame( + frames::headers(5) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + // Ping-pong so as to wait until server gets req + client.ping_pong([0; 8]).await; + client.send_frame(frames::go_away(0)).await; + // 2^31 - 1 = 2147483647 + // Note: not using a constant in the library because library devs + // can be unsmart. + client.recv_frame(frames::go_away(2147483647)).await; + client.recv_frame(frames::ping(frame::Ping::SHUTDOWN)).await; + client + .recv_frame(frames::headers(5).response(200).eos()) + .await; + client + .send_frame(frames::ping(frame::Ping::SHUTDOWN).pong()) + .await; + client.recv_frame(frames::go_away(5)).await; + client.recv_eof().await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + assert_eq!(req.method(), &http::Method::GET); + + srv.graceful_shutdown(); + + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + + assert!(srv.next().await.is_none(), "unexpected request"); + }; + + join(client, srv).await; +} + #[tokio::test] async fn sends_reset_cancel_when_res_body_is_dropped() { let _ = env_logger::try_init(); diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index 5fa4e9f..dd0316c 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -330,6 +330,40 @@ async fn recv_goaway_finishes_processed_streams() { join(srv, h2).await; } +#[tokio::test] +async fn recv_goaway_with_higher_last_processed_id() { + let _ = env_logger::try_init(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + srv.send_frame(frames::go_away(1)).await; + // a bigger goaway? kaboom + srv.send_frame(frames::go_away(3)).await; + // expecting a goaway of 0, since server never initiated a stream + srv.recv_frame(frames::go_away(0).protocol_error()).await; + //.close(); + }; + + let client = async move { + let (mut client, mut conn) = client::handshake(io).await.expect("handshake"); + let err = conn + .drive(client.get("https://example.com")) + .await + .expect_err("client should error"); + assert_eq!(err.reason(), Some(Reason::PROTOCOL_ERROR)); + }; + + join(srv, client).await; +} + #[tokio::test] async fn recv_next_stream_id_updated_by_malformed_headers() { let _ = env_logger::try_init();