Avoid prematurely unlinking streams in send_reset, in some cases. (#319)
Because `send_reset` called `recv_err`, which calls `reclaim_all_capacity`, which eventually calls `transition(stream, ..)` -- all of which happens _before_ the RESET frame is enqueued -- it was possible for the stream to get unlinked from the store (if there was any connection-level capacity to reassign). This could then cause the stream to get "leaked" on drop/EOF since it would no longer be iterated. Fix this by delaying the call to `reclaim_all_capacity` _after_ enqueueing the RESET frame. A test demonstrating the issue is included.
This commit is contained in:
committed by
Carl Lerche
parent
9bbbe7ebd5
commit
ea8b8ac2fd
@@ -159,12 +159,17 @@ impl Send {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.recv_err(buffer, stream, counts);
|
// Clear all pending outbound frames.
|
||||||
|
// Note that we don't call `self.recv_err` because we want to enqueue
|
||||||
|
// the reset frame before transitioning the stream inside
|
||||||
|
// `reclaim_all_capacity`.
|
||||||
|
self.prioritize.clear_queue(buffer, stream);
|
||||||
|
|
||||||
let frame = frame::Reset::new(stream.id, reason);
|
let frame = frame::Reset::new(stream.id, reason);
|
||||||
|
|
||||||
trace!("send_reset -- queueing; frame={:?}", frame);
|
trace!("send_reset -- queueing; frame={:?}", frame);
|
||||||
self.prioritize.queue_frame(frame.into(), buffer, stream, task);
|
self.prioritize.queue_frame(frame.into(), buffer, stream, task);
|
||||||
|
self.prioritize.reclaim_all_capacity(stream, counts);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn schedule_implicit_reset(
|
pub fn schedule_implicit_reset(
|
||||||
|
|||||||
@@ -1061,6 +1061,79 @@ fn drop_pending_open() {
|
|||||||
client.join(srv).wait().unwrap();
|
client.join(srv).wait().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn malformed_response_headers_dont_unlink_stream() {
|
||||||
|
// This test checks that receiving malformed headers frame on a stream with
|
||||||
|
// no remaining references correctly resets the stream, without prematurely
|
||||||
|
// unlinking it.
|
||||||
|
let _ = ::env_logger::try_init();
|
||||||
|
|
||||||
|
let (io, srv) = mock::new();
|
||||||
|
let (drop_tx, drop_rx) = futures::sync::oneshot::channel();
|
||||||
|
let (queued_tx, queued_rx) = futures::sync::oneshot::channel();
|
||||||
|
|
||||||
|
let srv = srv
|
||||||
|
.assert_client_handshake()
|
||||||
|
.unwrap()
|
||||||
|
.recv_settings()
|
||||||
|
.recv_frame(frames::headers(1).request("GET", "http://example.com/"))
|
||||||
|
.recv_frame(frames::headers(3).request("GET", "http://example.com/"))
|
||||||
|
.recv_frame(frames::headers(5).request("GET", "http://example.com/"))
|
||||||
|
.map(move |h| {
|
||||||
|
drop_tx.send(()).unwrap();
|
||||||
|
h
|
||||||
|
})
|
||||||
|
.wait_for(queued_rx)
|
||||||
|
.send_bytes(&[
|
||||||
|
// 2 byte frame
|
||||||
|
0, 0, 2,
|
||||||
|
// type: HEADERS
|
||||||
|
1,
|
||||||
|
// flags: END_STREAM | END_HEADERS
|
||||||
|
5,
|
||||||
|
// stream identifier: 3
|
||||||
|
0, 0, 0, 3,
|
||||||
|
// data - invalid (pseudo not at end of block)
|
||||||
|
144, 135
|
||||||
|
// Per the spec, this frame should cause a stream error of type
|
||||||
|
// PROTOCOL_ERROR.
|
||||||
|
])
|
||||||
|
.close()
|
||||||
|
;
|
||||||
|
|
||||||
|
fn request() -> Request<()> {
|
||||||
|
Request::builder()
|
||||||
|
.uri("http://example.com/")
|
||||||
|
.body(())
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
let client = client::Builder::new()
|
||||||
|
.handshake::<_, Bytes>(io)
|
||||||
|
.expect("handshake")
|
||||||
|
.and_then(move |(mut client, conn)| {
|
||||||
|
let (_req1, mut send1) = client.send_request(
|
||||||
|
request(), false).unwrap();
|
||||||
|
// Use up most of the connection window.
|
||||||
|
send1.send_data(vec![0; 65534].into(), true).unwrap();
|
||||||
|
let (req2, mut send2) = client.send_request(
|
||||||
|
request(), false).unwrap();
|
||||||
|
let (req3, mut send3) = client.send_request(
|
||||||
|
request(), false).unwrap();
|
||||||
|
conn.expect("h2").join(drop_rx.then(move |_| {
|
||||||
|
// Use up the remainder of the connection window.
|
||||||
|
send2.send_data(vec![0; 2].into(), true).unwrap();
|
||||||
|
// Queue up for more connection window.
|
||||||
|
send3.send_data(vec![0; 1].into(), true).unwrap();
|
||||||
|
queued_tx.send(()).unwrap();
|
||||||
|
Ok((req2, req3))
|
||||||
|
}))
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
client.join(srv).wait().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
|
const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
|
||||||
const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
|
const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user