Notify connection on connection window expansion (#86)
When capacity is released back to the connection and a connection level window update needs to be sent out, the connection task needs to be notified in order for the send to actually happen.
This commit is contained in:
@@ -222,6 +222,12 @@ where
|
|||||||
self.flow.assign_capacity(capacity);
|
self.flow.assign_capacity(capacity);
|
||||||
stream.recv_flow.assign_capacity(capacity);
|
stream.recv_flow.assign_capacity(capacity);
|
||||||
|
|
||||||
|
if self.flow.unclaimed_capacity().is_some() {
|
||||||
|
if let Some(task) = task.take() {
|
||||||
|
task.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if stream.recv_flow.unclaimed_capacity().is_some() {
|
if stream.recv_flow.unclaimed_capacity().is_some() {
|
||||||
// Queue the stream for sending the WINDOW_UPDATE frame.
|
// Queue the stream for sending the WINDOW_UPDATE frame.
|
||||||
self.pending_window_updates.push(stream);
|
self.pending_window_updates.push(stream);
|
||||||
|
|||||||
@@ -669,3 +669,112 @@ fn reserved_capacity_assigned_in_multi_window_updates() {
|
|||||||
|
|
||||||
let _ = h2.join(srv).wait().unwrap();
|
let _ = h2.join(srv).wait().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn connection_notified_on_released_capacity() {
|
||||||
|
use futures::sync::oneshot;
|
||||||
|
use std::thread;
|
||||||
|
use std::sync::mpsc;
|
||||||
|
|
||||||
|
let _ = ::env_logger::init();
|
||||||
|
let (io, srv) = mock::new();
|
||||||
|
|
||||||
|
// We're going to run the connection on a thread in order to isolate task
|
||||||
|
// notifications. This test is here, in part, to ensure that the connection
|
||||||
|
// receives the appropriate notifications to send out window updates.
|
||||||
|
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
|
// Because threading is fun
|
||||||
|
let (settings_tx, settings_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let th1 = thread::spawn(move || {
|
||||||
|
srv.assert_client_handshake().unwrap()
|
||||||
|
.recv_settings()
|
||||||
|
.map(move |v| {
|
||||||
|
settings_tx.send(()).unwrap();
|
||||||
|
v
|
||||||
|
})
|
||||||
|
// Get the first request
|
||||||
|
.recv_frame(
|
||||||
|
frames::headers(1)
|
||||||
|
.request("GET", "https://example.com/a")
|
||||||
|
.eos())
|
||||||
|
// Get the second request
|
||||||
|
.recv_frame(
|
||||||
|
frames::headers(3)
|
||||||
|
.request("GET", "https://example.com/b")
|
||||||
|
.eos())
|
||||||
|
// Send the first response
|
||||||
|
.send_frame(frames::headers(1).response(200))
|
||||||
|
// Send the second response
|
||||||
|
.send_frame(frames::headers(3).response(200))
|
||||||
|
|
||||||
|
// Fill the connection window
|
||||||
|
.send_frame(frames::data(1, vec![0u8; 16_384]).eos())
|
||||||
|
.idle_ms(100)
|
||||||
|
.send_frame(frames::data(3, vec![0u8; 16_384]).eos())
|
||||||
|
|
||||||
|
// The window update is sent
|
||||||
|
.recv_frame(frames::window_update(0, 16_384))
|
||||||
|
.map(drop)
|
||||||
|
.wait().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
let th2 = thread::spawn(move || {
|
||||||
|
let h2 = Client::handshake(io).wait().unwrap();
|
||||||
|
|
||||||
|
let (mut h2, _) = h2.drive(settings_rx).wait().unwrap();
|
||||||
|
|
||||||
|
let request = Request::get("https://example.com/a")
|
||||||
|
.body(())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
tx.send(h2.request(request, true).unwrap()).unwrap();
|
||||||
|
|
||||||
|
let request = Request::get("https://example.com/b")
|
||||||
|
.body(())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
tx.send(h2.request(request, true).unwrap()).unwrap();
|
||||||
|
|
||||||
|
// Run the connection to completion
|
||||||
|
h2.wait().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Get the two requests
|
||||||
|
let a = rx.recv().unwrap();
|
||||||
|
let b = rx.recv().unwrap();
|
||||||
|
|
||||||
|
// Get the first response
|
||||||
|
let response = a.wait().unwrap();
|
||||||
|
assert_eq!(response.status(), StatusCode::OK);
|
||||||
|
let (_, a) = response.into_parts();
|
||||||
|
|
||||||
|
// Get the next chunk
|
||||||
|
let (chunk, mut a) = a.into_future().wait().unwrap();
|
||||||
|
assert_eq!(16_384, chunk.unwrap().len());
|
||||||
|
|
||||||
|
// Get the second response
|
||||||
|
let response = b.wait().unwrap();
|
||||||
|
assert_eq!(response.status(), StatusCode::OK);
|
||||||
|
let (_, b) = response.into_parts();
|
||||||
|
|
||||||
|
// Get the next chunk
|
||||||
|
let (chunk, b) = b.into_future().wait().unwrap();
|
||||||
|
assert_eq!(16_384, chunk.unwrap().len());
|
||||||
|
|
||||||
|
// Wait a bit
|
||||||
|
thread::sleep(Duration::from_millis(100));
|
||||||
|
|
||||||
|
// Release the capacity
|
||||||
|
a.release_capacity(16_384).unwrap();
|
||||||
|
|
||||||
|
th1.join().unwrap();
|
||||||
|
th2.join().unwrap();
|
||||||
|
|
||||||
|
// Explicitly drop this after the joins so that the capacity doesn't get
|
||||||
|
// implicitly released before.
|
||||||
|
drop(b);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user