Split response future from client::Stream (#153)
This commit is contained in:
@@ -35,7 +35,7 @@ fn send_data_without_requesting_capacity() {
|
||||
.body(())
|
||||
.unwrap();
|
||||
|
||||
let mut stream = client.send_request(request, false).unwrap();
|
||||
let (response, mut stream) = client.send_request(request, false).unwrap();
|
||||
|
||||
// The capacity should be immediately allocated
|
||||
assert_eq!(stream.capacity(), 0);
|
||||
@@ -44,7 +44,7 @@ fn send_data_without_requesting_capacity() {
|
||||
stream.send_data(payload[..].into(), true).unwrap();
|
||||
|
||||
// Get the response
|
||||
let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap();
|
||||
let resp = h2.run(response).unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
h2.wait().unwrap();
|
||||
@@ -90,6 +90,7 @@ fn release_capacity_sends_window_update() {
|
||||
.unwrap();
|
||||
|
||||
let req = client.send_request(request, true).unwrap()
|
||||
.0
|
||||
.unwrap()
|
||||
// Get the response
|
||||
.and_then(|resp| {
|
||||
@@ -117,6 +118,7 @@ fn release_capacity_sends_window_update() {
|
||||
assert_eq!(buf.unwrap().len(), payload.len());
|
||||
Ok(())
|
||||
});
|
||||
|
||||
h2.unwrap().join(req)
|
||||
});
|
||||
h2.join(mock).wait().unwrap();
|
||||
@@ -153,6 +155,7 @@ fn release_capacity_of_small_amount_does_not_send_window_update() {
|
||||
.unwrap();
|
||||
|
||||
let req = client.send_request(request, true).unwrap()
|
||||
.0
|
||||
.unwrap()
|
||||
// Get the response
|
||||
.and_then(|resp| {
|
||||
@@ -222,7 +225,7 @@ fn recv_data_overflows_connection_window() {
|
||||
let req = client
|
||||
.send_request(request, true)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.0.unwrap()
|
||||
.and_then(|resp| {
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = resp.into_parts().1;
|
||||
@@ -292,7 +295,7 @@ fn recv_data_overflows_stream_window() {
|
||||
let req = client
|
||||
.send_request(request, true)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.0.unwrap()
|
||||
.and_then(|resp| {
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = resp.into_parts().1;
|
||||
@@ -343,7 +346,7 @@ fn stream_close_by_data_frame_releases_capacity() {
|
||||
.unwrap();
|
||||
|
||||
// Send request
|
||||
let mut s1 = client.send_request(request, false).unwrap();
|
||||
let (resp1, mut s1) = client.send_request(request, false).unwrap();
|
||||
|
||||
// This effectively reserves the entire connection window
|
||||
s1.reserve_capacity(window_size);
|
||||
@@ -359,7 +362,7 @@ fn stream_close_by_data_frame_releases_capacity() {
|
||||
.unwrap();
|
||||
|
||||
// Create a second stream
|
||||
let mut s2 = client.send_request(request, false).unwrap();
|
||||
let (resp2, mut s2) = client.send_request(request, false).unwrap();
|
||||
|
||||
// Request capacity
|
||||
s2.reserve_capacity(5);
|
||||
@@ -380,8 +383,8 @@ fn stream_close_by_data_frame_releases_capacity() {
|
||||
|
||||
// Drive both streams to prevent the handles from being dropped
|
||||
// (which will send a RST_STREAM) before the connection is closed.
|
||||
h2.drive(s1)
|
||||
.and_then(move |(h2, _)| h2.drive(s2))
|
||||
h2.drive(resp1)
|
||||
.and_then(move |(h2, _)| h2.drive(resp2))
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
@@ -414,7 +417,7 @@ fn stream_close_by_trailers_frame_releases_capacity() {
|
||||
.unwrap();
|
||||
|
||||
// Send request
|
||||
let mut s1 = client.send_request(request, false).unwrap();
|
||||
let (resp1, mut s1) = client.send_request(request, false).unwrap();
|
||||
|
||||
// This effectively reserves the entire connection window
|
||||
s1.reserve_capacity(window_size);
|
||||
@@ -430,7 +433,7 @@ fn stream_close_by_trailers_frame_releases_capacity() {
|
||||
.unwrap();
|
||||
|
||||
// Create a second stream
|
||||
let mut s2 = client.send_request(request, false).unwrap();
|
||||
let (resp2, mut s2) = client.send_request(request, false).unwrap();
|
||||
|
||||
// Request capacity
|
||||
s2.reserve_capacity(5);
|
||||
@@ -451,8 +454,8 @@ fn stream_close_by_trailers_frame_releases_capacity() {
|
||||
|
||||
// Drive both streams to prevent the handles from being dropped
|
||||
// (which will send a RST_STREAM) before the connection is closed.
|
||||
h2.drive(s1)
|
||||
.and_then(move |(h2, _)| h2.drive(s2))
|
||||
h2.drive(resp1)
|
||||
.and_then(move |(h2, _)| h2.drive(resp2))
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
@@ -484,27 +487,6 @@ fn stream_close_by_send_reset_frame_releases_capacity() {}
|
||||
#[ignore]
|
||||
fn stream_close_by_recv_reset_frame_releases_capacity() {}
|
||||
|
||||
use futures::{Async, Poll};
|
||||
|
||||
struct GetResponse {
|
||||
stream: Option<client::Stream<Bytes>>,
|
||||
}
|
||||
|
||||
impl Future for GetResponse {
|
||||
type Item = (Response<client::Body<Bytes>>, client::Stream<Bytes>);
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let response = match self.stream.as_mut().unwrap().poll_response() {
|
||||
Ok(Async::Ready(v)) => v,
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(e) => panic!("unexpected error; {:?}", e),
|
||||
};
|
||||
|
||||
Ok(Async::Ready((response, self.stream.take().unwrap())))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn recv_window_update_on_stream_closed_by_data_frame() {
|
||||
let _ = ::env_logger::init();
|
||||
@@ -519,12 +501,10 @@ fn recv_window_update_on_stream_closed_by_data_frame() {
|
||||
.body(())
|
||||
.unwrap();
|
||||
|
||||
let stream = client.send_request(request, false).unwrap();
|
||||
let (response, stream) = client.send_request(request, false).unwrap();
|
||||
|
||||
// Wait for the response
|
||||
h2.drive(GetResponse {
|
||||
stream: Some(stream),
|
||||
})
|
||||
h2.drive(response.map(|response| (response, stream)))
|
||||
})
|
||||
.and_then(|(h2, (response, mut stream))| {
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
@@ -570,7 +550,7 @@ fn reserved_capacity_assigned_in_multi_window_updates() {
|
||||
.body(())
|
||||
.unwrap();
|
||||
|
||||
let mut stream = client.send_request(request, false).unwrap();
|
||||
let (response, mut stream) = client.send_request(request, false).unwrap();
|
||||
|
||||
// Consume the capacity
|
||||
let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize];
|
||||
@@ -579,17 +559,17 @@ fn reserved_capacity_assigned_in_multi_window_updates() {
|
||||
// Reserve more data than we want
|
||||
stream.reserve_capacity(10);
|
||||
|
||||
h2.drive(util::wait_for_capacity(stream, 5))
|
||||
h2.drive(
|
||||
util::wait_for_capacity(stream, 5)
|
||||
.map(|stream| (response, stream)))
|
||||
})
|
||||
.and_then(|(h2, mut stream)| {
|
||||
.and_then(|(h2, (response, mut stream))| {
|
||||
stream.send_data("hello".into(), false).unwrap();
|
||||
stream.send_data("world".into(), true).unwrap();
|
||||
|
||||
h2.drive(GetResponse {
|
||||
stream: Some(stream),
|
||||
})
|
||||
h2.drive(response)
|
||||
})
|
||||
.and_then(|(h2, (response, _))| {
|
||||
.and_then(|(h2, response)| {
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Wait for the connection to close
|
||||
@@ -709,8 +689,8 @@ fn connection_notified_on_released_capacity() {
|
||||
});
|
||||
|
||||
// Get the two requests
|
||||
let a = rx.recv().unwrap();
|
||||
let b = rx.recv().unwrap();
|
||||
let (a, _) = rx.recv().unwrap();
|
||||
let (b, _) = rx.recv().unwrap();
|
||||
|
||||
// Get the first response
|
||||
let response = a.wait().unwrap();
|
||||
@@ -777,20 +757,20 @@ fn recv_settings_removes_available_capacity() {
|
||||
.uri("https://http2.akamai.com/")
|
||||
.body(()).unwrap();
|
||||
|
||||
let mut stream = client.send_request(request, false).unwrap();
|
||||
let (response, mut stream) = client.send_request(request, false).unwrap();
|
||||
|
||||
stream.reserve_capacity(11);
|
||||
|
||||
h2.drive(util::wait_for_capacity(stream, 11))
|
||||
h2.drive(util::wait_for_capacity(stream, 11).map(|s| (response, s)))
|
||||
})
|
||||
.and_then(|(h2, mut stream)| {
|
||||
.and_then(|(h2, (response, mut stream))| {
|
||||
assert_eq!(stream.capacity(), 11);
|
||||
|
||||
stream.send_data("hello world".into(), true).unwrap();
|
||||
|
||||
h2.drive(GetResponse { stream: Some(stream) })
|
||||
h2.drive(response)
|
||||
})
|
||||
.and_then(|(h2, (response, _))| {
|
||||
.and_then(|(h2, response)| {
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Wait for the connection to close
|
||||
@@ -838,20 +818,20 @@ fn recv_no_init_window_then_receive_some_init_window() {
|
||||
.uri("https://http2.akamai.com/")
|
||||
.body(()).unwrap();
|
||||
|
||||
let mut stream = client.send_request(request, false).unwrap();
|
||||
let (response, mut stream) = client.send_request(request, false).unwrap();
|
||||
|
||||
stream.reserve_capacity(11);
|
||||
|
||||
h2.drive(util::wait_for_capacity(stream, 11))
|
||||
h2.drive(util::wait_for_capacity(stream, 11).map(|s| (response, s)))
|
||||
})
|
||||
.and_then(|(h2, mut stream)| {
|
||||
.and_then(|(h2, (response, mut stream))| {
|
||||
assert_eq!(stream.capacity(), 11);
|
||||
|
||||
stream.send_data("hello world".into(), true).unwrap();
|
||||
|
||||
h2.drive(GetResponse { stream: Some(stream) })
|
||||
h2.drive(response)
|
||||
})
|
||||
.and_then(|(h2, (response, _))| {
|
||||
.and_then(|(h2, response)| {
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Wait for the connection to close
|
||||
@@ -930,12 +910,12 @@ fn settings_lowered_capacity_returns_capacity_to_connection() {
|
||||
let request = Request::post("https://example.com/one")
|
||||
.body(()).unwrap();
|
||||
|
||||
let mut stream1 = client.send_request(request, false).unwrap();
|
||||
let (resp1, mut stream1) = client.send_request(request, false).unwrap();
|
||||
|
||||
let request = Request::post("https://example.com/two")
|
||||
.body(()).unwrap();
|
||||
|
||||
let mut stream2 = client.send_request(request, false).unwrap();
|
||||
let (resp2, mut stream2) = client.send_request(request, false).unwrap();
|
||||
|
||||
// Reserve capacity for stream one, this will consume all connection level
|
||||
// capacity
|
||||
@@ -955,10 +935,10 @@ fn settings_lowered_capacity_returns_capacity_to_connection() {
|
||||
stream1.send_data("hello world".into(), true).unwrap();
|
||||
|
||||
// Wait for responses..
|
||||
let resp = stream1.wait().unwrap();
|
||||
let resp = resp1.wait().unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
let resp = stream2.wait().unwrap();
|
||||
let resp = resp2.wait().unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
th1.join().unwrap();
|
||||
@@ -1011,7 +991,7 @@ fn increase_target_window_size_after_using_some() {
|
||||
.uri("https://http2.akamai.com/")
|
||||
.body(()).unwrap();
|
||||
|
||||
let res = client.send_request(request, true).unwrap()
|
||||
let res = client.send_request(request, true).unwrap().0
|
||||
.and_then(|res| {
|
||||
// "leak" the capacity for now
|
||||
res.into_parts().1.concat2()
|
||||
@@ -1055,8 +1035,8 @@ fn decrease_target_window_size() {
|
||||
let request = Request::builder()
|
||||
.uri("https://http2.akamai.com/")
|
||||
.body(()).unwrap();
|
||||
let res = client.send_request(request, true).unwrap();
|
||||
conn.drive(res.expect("response"))
|
||||
let (resp, _) = client.send_request(request, true).unwrap();
|
||||
conn.drive(resp.expect("response"))
|
||||
})
|
||||
.and_then(|(mut conn, res)| {
|
||||
conn.set_target_window_size(16_384);
|
||||
|
||||
Reference in New Issue
Block a user