diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index d63aa33..c543668 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -555,6 +555,9 @@ impl Prioritize { while let Some(frame) = stream.pending_send.pop_front(buffer) { trace!("dropping; frame={:?}", frame); } + + stream.buffered_send_data = 0; + stream.requested_send_capacity = 0; } fn pop_frame( @@ -574,6 +577,14 @@ impl Prioritize { Some(mut stream) => { trace!("pop_frame; stream={:?}", stream.id); + // If the stream receives a RESET from the peer, it may have + // had data buffered to be sent, but all the frames are cleared + // in clear_queue(). Instead of doing O(N) traversal through queue + // to remove, lets just ignore peer_reset streams here. + if stream.state.is_peer_reset() { + continue; + } + // It's possible that this stream, besides having data to send, // is also queued to send a reset, and thus is already in the queue // to wait for "some time" after a reset. diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 8dc2874..2dc587c 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -588,16 +588,12 @@ impl Recv { } /// Handle remote sending an explicit RST_STREAM. - pub fn recv_reset( - &mut self, - frame: frame::Reset, - stream: &mut Stream, - ) -> Result<(), RecvError> { + pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { // Notify the stream stream.state.recv_reset(frame.reason()); + stream.notify_send(); stream.notify_recv(); - Ok(()) } /// Handle a received error diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 2b92afc..b3f4716 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -283,6 +283,15 @@ impl Send { Ok(()) } + pub fn recv_reset( + &mut self, + buffer: &mut Buffer>, + stream: &mut store::Ptr + ) { + // Clear all pending outbound frames + self.prioritize.clear_queue(buffer, stream); + } + pub fn recv_err( &mut self, buffer: &mut Buffer>, diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 0952070..94f093d 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -301,6 +301,13 @@ impl State { } } + pub fn is_peer_reset(&self) -> bool { + match self.inner { + Closed(Cause::Proto(_)) => true, + _ => false, + } + } + /// Returns true if the stream is already reset. pub fn is_reset(&self) -> bool { match self.inner { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index eeac656..00970e2 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -255,10 +255,14 @@ where }, }; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + let actions = &mut me.actions; me.counts.transition(stream, |_, stream| { - actions.recv.recv_reset(frame, stream)?; + actions.recv.recv_reset(frame, stream); + actions.send.recv_reset(send_buffer, stream); assert!(stream.state.is_closed()); Ok(()) }) diff --git a/tests/client_request.rs b/tests/client_request.rs index a2c09d9..e785719 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -708,6 +708,69 @@ fn recv_too_big_headers() { } +#[test] +fn pending_send_request_gets_reset_by_peer_properly() { + let _ = ::env_logger::try_init(); + let (io, srv) = mock::new(); + + let payload = [0; (frame::DEFAULT_INITIAL_WINDOW_SIZE * 2) as usize]; + let max_frame_size = frame::DEFAULT_MAX_FRAME_SIZE as usize; + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/"), + ) + // Note that we can only send up to ~4 frames of data by default + .recv_frame(frames::data(1, &payload[0..max_frame_size])) + .recv_frame(frames::data(1, &payload[max_frame_size..(max_frame_size*2)])) + .recv_frame(frames::data(1, &payload[(max_frame_size*2)..(max_frame_size*3)])) + .recv_frame(frames::data(1, &payload[(max_frame_size*3)..(max_frame_size*4-1)])) + + .idle_ms(100) + + .send_frame(frames::reset(1).refused()) + // Because all active requests are finished, connection should shutdown + // and send a GO_AWAY frame. If the reset stream is bugged (and doesn't + // count towards concurrency limit), then connection will not send + // a GO_AWAY and this test will fail. + .recv_frame(frames::go_away(0)) + + .close(); + + let client = client::Builder::new() + .handshake::<_, Bytes>(io) + .expect("handshake") + .and_then(|(mut client, conn)| { + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client + .send_request(request, false) + .expect("send_request"); + + let response = response.expect_err("response") + .map(|err| { + assert_eq!( + err.reason(), + Some(Reason::REFUSED_STREAM) + ); + }); + + // Send the data + stream.send_data(payload[..].into(), true).unwrap(); + + conn.drive(response) + .and_then(|(conn, _)| conn.expect("client")) + }); + + client.join(srv).wait().expect("wait"); +} + #[test] fn request_without_path() { let _ = ::env_logger::try_init();