From f84a1bdd1f0aa536c1d79b78ddccef3ebb870877 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 14 Sep 2017 13:50:52 -0700 Subject: [PATCH] Notify connection on connection window expansion (#86) When capacity is released back to the connection and a connection level window update needs to be sent out, the connection task needs to be notified in order for the send to actually happen. --- src/proto/streams/recv.rs | 6 +++ tests/flow_control.rs | 109 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+) diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index c810edf..5deaba2 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -222,6 +222,12 @@ where self.flow.assign_capacity(capacity); stream.recv_flow.assign_capacity(capacity); + if self.flow.unclaimed_capacity().is_some() { + if let Some(task) = task.take() { + task.notify(); + } + } + if stream.recv_flow.unclaimed_capacity().is_some() { // Queue the stream for sending the WINDOW_UPDATE frame. self.pending_window_updates.push(stream); diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 47f8350..e2aa599 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -669,3 +669,112 @@ fn reserved_capacity_assigned_in_multi_window_updates() { let _ = h2.join(srv).wait().unwrap(); } + +#[test] +fn connection_notified_on_released_capacity() { + use futures::sync::oneshot; + use std::thread; + use std::sync::mpsc; + + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + // We're going to run the connection on a thread in order to isolate task + // notifications. This test is here, in part, to ensure that the connection + // receives the appropriate notifications to send out window updates. + + let (tx, rx) = mpsc::channel(); + + // Because threading is fun + let (settings_tx, settings_rx) = oneshot::channel(); + + let th1 = thread::spawn(move || { + srv.assert_client_handshake().unwrap() + .recv_settings() + .map(move |v| { + settings_tx.send(()).unwrap(); + v + }) + // Get the first request + .recv_frame( + frames::headers(1) + .request("GET", "https://example.com/a") + .eos()) + // Get the second request + .recv_frame( + frames::headers(3) + .request("GET", "https://example.com/b") + .eos()) + // Send the first response + .send_frame(frames::headers(1).response(200)) + // Send the second response + .send_frame(frames::headers(3).response(200)) + + // Fill the connection window + .send_frame(frames::data(1, vec![0u8; 16_384]).eos()) + .idle_ms(100) + .send_frame(frames::data(3, vec![0u8; 16_384]).eos()) + + // The window update is sent + .recv_frame(frames::window_update(0, 16_384)) + .map(drop) + .wait().unwrap(); + }); + + + let th2 = thread::spawn(move || { + let h2 = Client::handshake(io).wait().unwrap(); + + let (mut h2, _) = h2.drive(settings_rx).wait().unwrap(); + + let request = Request::get("https://example.com/a") + .body(()) + .unwrap(); + + tx.send(h2.request(request, true).unwrap()).unwrap(); + + let request = Request::get("https://example.com/b") + .body(()) + .unwrap(); + + tx.send(h2.request(request, true).unwrap()).unwrap(); + + // Run the connection to completion + h2.wait().unwrap(); + }); + + // Get the two requests + let a = rx.recv().unwrap(); + let b = rx.recv().unwrap(); + + // Get the first response + let response = a.wait().unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let (_, a) = response.into_parts(); + + // Get the next chunk + let (chunk, mut a) = a.into_future().wait().unwrap(); + assert_eq!(16_384, chunk.unwrap().len()); + + // Get the second response + let response = b.wait().unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let (_, b) = response.into_parts(); + + // Get the next chunk + let (chunk, b) = b.into_future().wait().unwrap(); + assert_eq!(16_384, chunk.unwrap().len()); + + // Wait a bit + thread::sleep(Duration::from_millis(100)); + + // Release the capacity + a.release_capacity(16_384).unwrap(); + + th1.join().unwrap(); + th2.join().unwrap(); + + // Explicitly drop this after the joins so that the capacity doesn't get + // implicitly released before. + drop(b); +}