Fix receiving a GOAWAY frame from updating the max recv ID
Receiving a GOAWAY should update the max *send* ID, it shouldn't affect the max recv.
This commit is contained in:
@@ -18,6 +18,15 @@ pub(super) struct Send {
|
|||||||
/// Stream identifier to use for next initialized stream.
|
/// Stream identifier to use for next initialized stream.
|
||||||
next_stream_id: Result<StreamId, StreamIdOverflow>,
|
next_stream_id: Result<StreamId, StreamIdOverflow>,
|
||||||
|
|
||||||
|
/// Any streams with a higher ID are ignored.
|
||||||
|
///
|
||||||
|
/// This starts as MAX, but is lowered when a GOAWAY is received.
|
||||||
|
///
|
||||||
|
/// > After sending a GOAWAY frame, the sender can discard frames for
|
||||||
|
/// > streams initiated by the receiver with identifiers higher than
|
||||||
|
/// > the identified last stream.
|
||||||
|
max_stream_id: StreamId,
|
||||||
|
|
||||||
/// Initial window size of locally initiated streams
|
/// Initial window size of locally initiated streams
|
||||||
init_window_sz: WindowSize,
|
init_window_sz: WindowSize,
|
||||||
|
|
||||||
@@ -37,6 +46,7 @@ impl Send {
|
|||||||
pub fn new(config: &Config) -> Self {
|
pub fn new(config: &Config) -> Self {
|
||||||
Send {
|
Send {
|
||||||
init_window_sz: config.remote_init_window_sz,
|
init_window_sz: config.remote_init_window_sz,
|
||||||
|
max_stream_id: StreamId::MAX,
|
||||||
next_stream_id: Ok(config.local_next_stream_id),
|
next_stream_id: Ok(config.local_next_stream_id),
|
||||||
prioritize: Prioritize::new(config),
|
prioritize: Prioritize::new(config),
|
||||||
}
|
}
|
||||||
@@ -370,6 +380,26 @@ impl Send {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), RecvError> {
|
||||||
|
if last_stream_id > self.max_stream_id {
|
||||||
|
// The remote endpoint sent a `GOAWAY` frame indicating a stream
|
||||||
|
// that we never sent, or that we have already terminated on account
|
||||||
|
// of previous `GOAWAY` frame. In either case, that is illegal.
|
||||||
|
// (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase
|
||||||
|
// the value they send in the last stream identifier, since the
|
||||||
|
// peers might already have retried unprocessed requests on another
|
||||||
|
// connection.")
|
||||||
|
proto_err!(conn:
|
||||||
|
"recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})",
|
||||||
|
last_stream_id, self.max_stream_id,
|
||||||
|
);
|
||||||
|
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.max_stream_id = last_stream_id;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn recv_err<B>(
|
pub fn recv_err<B>(
|
||||||
&mut self,
|
&mut self,
|
||||||
buffer: &mut Buffer<Frame<B>>,
|
buffer: &mut Buffer<Frame<B>>,
|
||||||
|
|||||||
@@ -392,25 +392,11 @@ where
|
|||||||
let send_buffer = &mut *send_buffer;
|
let send_buffer = &mut *send_buffer;
|
||||||
|
|
||||||
let last_stream_id = frame.last_stream_id();
|
let last_stream_id = frame.last_stream_id();
|
||||||
|
|
||||||
|
actions.send.recv_go_away(last_stream_id)?;
|
||||||
|
|
||||||
let err = frame.reason().into();
|
let err = frame.reason().into();
|
||||||
|
|
||||||
if last_stream_id > actions.recv.max_stream_id() {
|
|
||||||
// The remote endpoint sent a `GOAWAY` frame indicating a stream
|
|
||||||
// that we never sent, or that we have already terminated on account
|
|
||||||
// of previous `GOAWAY` frame. In either case, that is illegal.
|
|
||||||
// (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase
|
|
||||||
// the value they send in the last stream identifier, since the
|
|
||||||
// peers might already have retried unprocessed requests on another
|
|
||||||
// connection.")
|
|
||||||
proto_err!(conn:
|
|
||||||
"recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})",
|
|
||||||
last_stream_id, actions.recv.max_stream_id(),
|
|
||||||
);
|
|
||||||
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
|
|
||||||
}
|
|
||||||
|
|
||||||
actions.recv.go_away(last_stream_id);
|
|
||||||
|
|
||||||
me.store
|
me.store
|
||||||
.for_each(|stream| {
|
.for_each(|stream| {
|
||||||
if stream.id > last_stream_id {
|
if stream.id > last_stream_id {
|
||||||
|
|||||||
@@ -656,6 +656,55 @@ async fn graceful_shutdown() {
|
|||||||
join(client, srv).await;
|
join(client, srv).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn goaway_even_if_client_sent_goaway() {
|
||||||
|
let _ = env_logger::try_init();
|
||||||
|
let (io, mut client) = mock::new();
|
||||||
|
|
||||||
|
let client = async move {
|
||||||
|
let settings = client.assert_server_handshake().await;
|
||||||
|
assert_default_settings!(settings);
|
||||||
|
client
|
||||||
|
.send_frame(
|
||||||
|
frames::headers(5)
|
||||||
|
.request("GET", "https://example.com/")
|
||||||
|
.eos(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
// Ping-pong so as to wait until server gets req
|
||||||
|
client.ping_pong([0; 8]).await;
|
||||||
|
client.send_frame(frames::go_away(0)).await;
|
||||||
|
// 2^31 - 1 = 2147483647
|
||||||
|
// Note: not using a constant in the library because library devs
|
||||||
|
// can be unsmart.
|
||||||
|
client.recv_frame(frames::go_away(2147483647)).await;
|
||||||
|
client.recv_frame(frames::ping(frame::Ping::SHUTDOWN)).await;
|
||||||
|
client
|
||||||
|
.recv_frame(frames::headers(5).response(200).eos())
|
||||||
|
.await;
|
||||||
|
client
|
||||||
|
.send_frame(frames::ping(frame::Ping::SHUTDOWN).pong())
|
||||||
|
.await;
|
||||||
|
client.recv_frame(frames::go_away(5)).await;
|
||||||
|
client.recv_eof().await;
|
||||||
|
};
|
||||||
|
|
||||||
|
let srv = async move {
|
||||||
|
let mut srv = server::handshake(io).await.expect("handshake");
|
||||||
|
let (req, mut stream) = srv.next().await.unwrap().unwrap();
|
||||||
|
assert_eq!(req.method(), &http::Method::GET);
|
||||||
|
|
||||||
|
srv.graceful_shutdown();
|
||||||
|
|
||||||
|
let rsp = http::Response::builder().status(200).body(()).unwrap();
|
||||||
|
stream.send_response(rsp, true).unwrap();
|
||||||
|
|
||||||
|
assert!(srv.next().await.is_none(), "unexpected request");
|
||||||
|
};
|
||||||
|
|
||||||
|
join(client, srv).await;
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn sends_reset_cancel_when_res_body_is_dropped() {
|
async fn sends_reset_cancel_when_res_body_is_dropped() {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|||||||
@@ -330,6 +330,40 @@ async fn recv_goaway_finishes_processed_streams() {
|
|||||||
join(srv, h2).await;
|
join(srv, h2).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn recv_goaway_with_higher_last_processed_id() {
|
||||||
|
let _ = env_logger::try_init();
|
||||||
|
let (io, mut srv) = mock::new();
|
||||||
|
|
||||||
|
let srv = async move {
|
||||||
|
let settings = srv.assert_client_handshake().await;
|
||||||
|
assert_default_settings!(settings);
|
||||||
|
srv.recv_frame(
|
||||||
|
frames::headers(1)
|
||||||
|
.request("GET", "https://example.com/")
|
||||||
|
.eos(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
srv.send_frame(frames::go_away(1)).await;
|
||||||
|
// a bigger goaway? kaboom
|
||||||
|
srv.send_frame(frames::go_away(3)).await;
|
||||||
|
// expecting a goaway of 0, since server never initiated a stream
|
||||||
|
srv.recv_frame(frames::go_away(0).protocol_error()).await;
|
||||||
|
//.close();
|
||||||
|
};
|
||||||
|
|
||||||
|
let client = async move {
|
||||||
|
let (mut client, mut conn) = client::handshake(io).await.expect("handshake");
|
||||||
|
let err = conn
|
||||||
|
.drive(client.get("https://example.com"))
|
||||||
|
.await
|
||||||
|
.expect_err("client should error");
|
||||||
|
assert_eq!(err.reason(), Some(Reason::PROTOCOL_ERROR));
|
||||||
|
};
|
||||||
|
|
||||||
|
join(srv, client).await;
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn recv_next_stream_id_updated_by_malformed_headers() {
|
async fn recv_next_stream_id_updated_by_malformed_headers() {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|||||||
Reference in New Issue
Block a user