Fix potential hang if extensions contain same StreamRef

If a user stored a `StreamRef` to the same stream in the request or
response extensions, they would be dropped while the internal store lock
was held. That would lead to a deadlock, since dropping a stream ref
will try to take the store lock to clean up.

Clear extensions of Request and Response before locking store, prevent
this.

Fixes hyperium/hyper#2621
This commit is contained in:
Sean McArthur
2021-08-20 14:30:49 -07:00
parent 288a5f086f
commit ab6f148ee1
2 changed files with 47 additions and 3 deletions

View File

@@ -207,13 +207,16 @@ where
pub fn send_request( pub fn send_request(
&mut self, &mut self,
request: Request<()>, mut request: Request<()>,
end_of_stream: bool, end_of_stream: bool,
pending: Option<&OpaqueStreamRef>, pending: Option<&OpaqueStreamRef>,
) -> Result<StreamRef<B>, SendError> { ) -> Result<StreamRef<B>, SendError> {
use super::stream::ContentLength; use super::stream::ContentLength;
use http::Method; use http::Method;
// Clear before taking lock, incase extensions contain a StreamRef.
request.extensions_mut().clear();
// TODO: There is a hazard with assigning a stream ID before the // TODO: There is a hazard with assigning a stream ID before the
// prioritize layer. If prioritization reorders new streams, this // prioritize layer. If prioritization reorders new streams, this
// implicitly closes the earlier stream IDs. // implicitly closes the earlier stream IDs.
@@ -1062,9 +1065,11 @@ impl<B> StreamRef<B> {
pub fn send_response( pub fn send_response(
&mut self, &mut self,
response: Response<()>, mut response: Response<()>,
end_of_stream: bool, end_of_stream: bool,
) -> Result<(), UserError> { ) -> Result<(), UserError> {
// Clear before taking lock, incase extensions contain a StreamRef.
response.extensions_mut().clear();
let mut me = self.opaque.inner.lock().unwrap(); let mut me = self.opaque.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
@@ -1082,7 +1087,12 @@ impl<B> StreamRef<B> {
}) })
} }
pub fn send_push_promise(&mut self, request: Request<()>) -> Result<StreamRef<B>, UserError> { pub fn send_push_promise(
&mut self,
mut request: Request<()>,
) -> Result<StreamRef<B>, UserError> {
// Clear before taking lock, incase extensions contain a StreamRef.
request.extensions_mut().clear();
let mut me = self.opaque.inner.lock().unwrap(); let mut me = self.opaque.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;

View File

@@ -1059,3 +1059,37 @@ async fn request_without_authority() {
join(client, srv).await; join(client, srv).await;
} }
#[tokio::test]
async fn serve_when_request_in_response_extensions() {
h2_support::trace_init!();
let (io, mut client) = mock::new();
let client = async move {
let settings = client.assert_server_handshake().await;
assert_default_settings!(settings);
client
.send_frame(
frames::headers(1)
.request("GET", "https://example.com/")
.eos(),
)
.await;
client
.recv_frame(frames::headers(1).response(200).eos())
.await;
};
let srv = async move {
let mut srv = server::handshake(io).await.expect("handshake");
let (req, mut stream) = srv.next().await.unwrap().unwrap();
let mut rsp = http::Response::new(());
rsp.extensions_mut().insert(req);
stream.send_response(rsp, true).unwrap();
assert!(srv.next().await.is_none());
};
join(client, srv).await;
}