diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index a20b258..12f34eb 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -699,9 +699,6 @@ impl Prioritize { counts.inc_num_send_streams(); self.pending_send.push(&mut stream); - if let Some(task) = stream.open_task.take() { - task.notify(); - } } else { return; } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 775598f..8dc2874 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -595,6 +595,7 @@ impl Recv { ) -> Result<(), RecvError> { // Notify the stream stream.state.recv_reset(frame.reason()); + stream.notify_send(); stream.notify_recv(); Ok(()) } @@ -605,6 +606,7 @@ impl Recv { stream.state.recv_err(err); // If a receiver is waiting, notify it + stream.notify_send(); stream.notify_recv(); } diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 658e621..73dfd61 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -64,9 +64,6 @@ pub(super) struct Stream { /// Set to true when the stream is pending to be opened pub is_pending_open: bool, - /// Task tracking when stream can be "opened", or initially sent to socket. - pub open_task: Option, - // ===== Fields related to receiving ===== /// Next node in the accept linked list pub next_pending_accept: Option, @@ -168,7 +165,6 @@ impl Stream { send_capacity_inc: false, is_pending_open: false, next_open: None, - open_task: None, // ===== Fields related to receiving ===== next_pending_accept: None, diff --git a/tests/client_request.rs b/tests/client_request.rs index 4824de8..86297d8 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -282,6 +282,84 @@ fn request_over_max_concurrent_streams_errors() { h2.join(srv).wait().expect("wait"); } +#[test] +fn send_request_poll_ready_when_connection_error() { + let _ = ::env_logger::try_init(); + let (io, srv) = mock::new(); + + + let srv = srv.assert_client_handshake_with_settings(frames::settings() + // super tiny server + .max_concurrent_streams(1)) + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("POST", "https://example.com/") + .eos(), + ) + .send_frame(frames::headers(1).response(200).eos()) + .recv_frame(frames::headers(3).request("POST", "https://example.com/").eos()) + .send_frame(frames::headers(8).response(200).eos()) + //.recv_frame(frames::headers(5).request("POST", "https://example.com/").eos()) + .close(); + + let h2 = client::handshake(io) + .expect("handshake") + .and_then(|(mut client, h2)| { + // we send a simple req here just to drive the connection so we can + // receive the server settings. + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + + // first request is allowed + let (response, _) = client.send_request(request, true).unwrap(); + h2.drive(response).map(move |(h2, _)| (client, h2)) + }) + .and_then(|(mut client, h2)| { + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + + // first request is allowed + let (resp1, _) = client.send_request(request, true).unwrap(); + + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + + // second request is put into pending_open + let (resp2, _) = client.send_request(request, true).unwrap(); + + // third stream is over max concurrent + let until_ready = futures::future::poll_fn(move || { + client.poll_ready() + }).expect_err("client poll_ready").then(|_| Ok(())); + + // a FuturesUnordered is used on purpose! + // + // We don't want a join, since any of the other futures notifying + // will make the until_ready future polled again, but we are + // specifically testing that until_ready gets notified on its own. + let mut unordered = futures::stream::FuturesUnordered::>>::new(); + unordered.push(Box::new(until_ready)); + unordered.push(Box::new(h2.expect_err("client conn").then(|_| Ok(())))); + unordered.push(Box::new(resp1.expect_err("req1").then(|_| Ok(())))); + unordered.push(Box::new(resp2.expect_err("req2").then(|_| Ok(())))); + + unordered.for_each(|_| Ok(())) + }); + + h2.join(srv).wait().expect("wait"); +} + #[test] fn http_11_request_without_scheme_or_authority() { let _ = ::env_logger::try_init();