diff --git a/examples/akamai.rs b/examples/akamai.rs index fe1965a..cb9645c 100644 --- a/examples/akamai.rs +++ b/examples/akamai.rs @@ -75,9 +75,9 @@ pub fn main() { .body(()) .unwrap(); - let stream = client.send_request(request, true).unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); - let stream = stream.and_then(|response| { + let stream = response.and_then(|response| { let (_, body) = response.into_parts(); body.for_each(|chunk| { diff --git a/examples/client.rs b/examples/client.rs index bb6f2aa..2a4c6c0 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -70,7 +70,7 @@ pub fn main() { let mut trailers = HeaderMap::new(); trailers.insert("zomg", "hello".parse().unwrap()); - let mut stream = client.send_request(request, false).unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); // send trailers stream.send_trailers(trailers).unwrap(); @@ -78,7 +78,7 @@ pub fn main() { // Spawn a task to run the conn... handle.spawn(h2.map_err(|e| println!("GOT ERR={:?}", e))); - stream + response .and_then(|response| { println!("GOT RESPONSE: {:?}", response); diff --git a/src/client.rs b/src/client.rs index 92b6cee..39b18de 100644 --- a/src/client.rs +++ b/src/client.rs @@ -29,6 +29,11 @@ pub struct Connection { inner: proto::Connection, } +#[derive(Debug)] +pub struct ResponseFuture { + inner: proto::StreamRef, +} + #[derive(Debug)] pub struct Stream { inner: proto::StreamRef, @@ -113,7 +118,7 @@ where &mut self, request: Request<()>, end_of_stream: bool, - ) -> Result, ::Error> { + ) -> Result<(ResponseFuture, Stream), ::Error> { self.inner .send_request(request, end_of_stream, self.pending.as_ref()) .map_err(Into::into) @@ -121,9 +126,16 @@ where if stream.is_pending_open() { self.pending = Some(stream.key()); } - Stream { + + let response = ResponseFuture { + inner: stream.clone(), + }; + + let stream = Stream { inner: stream, - } + }; + + (response, stream) }) } } @@ -334,19 +346,26 @@ where } } -// ===== impl Stream ===== +// ===== impl ResponseFuture ===== -impl Stream { - /// Receive the HTTP/2.0 response, if it is ready. - pub fn poll_response(&mut self) -> Poll>, ::Error> { +impl Future for ResponseFuture { + type Item = Response>; + type Error = ::Error; + + fn poll(&mut self) -> Poll { let (parts, _) = try_ready!(self.inner.poll_response()).into_parts(); + let body = Body { inner: ReleaseCapacity { inner: self.inner.clone() }, }; Ok(Response::from_parts(parts, body).into()) } +} +// ===== impl Stream ===== + +impl Stream { /// Request capacity to send data pub fn reserve_capacity(&mut self, capacity: usize) { // TODO: Check for overflow @@ -381,15 +400,6 @@ impl Stream { } } -impl Future for Stream { - type Item = Response>; - type Error = ::Error; - - fn poll(&mut self) -> Poll { - self.poll_response() - } -} - // ===== impl Body ===== impl Body { diff --git a/tests/client_request.rs b/tests/client_request.rs index c6a927d..1c1640e 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -47,7 +47,7 @@ fn client_other_thread() { .unwrap(); let res = client .send_request(request, true) - .unwrap() + .unwrap().0 .wait() .expect("request"); assert_eq!(res.status(), StatusCode::OK); @@ -85,13 +85,13 @@ fn recv_invalid_server_stream_id() { .unwrap(); info!("sending request"); - let stream = client.send_request(request, true).unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); // The connection errors assert!(h2.wait().is_err()); // The stream errors - assert!(stream.wait().is_err()); + assert!(response.wait().is_err()); } #[test] @@ -112,9 +112,9 @@ fn request_stream_id_overflows() { .unwrap(); // first request is allowed - let req = client.send_request(request, true).unwrap().unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); - h2.drive(req).and_then(move |(h2, _)| { + h2.drive(response).and_then(move |(h2, _)| { let request = Request::builder() .method(Method::GET) .uri("https://example.com/") @@ -180,8 +180,9 @@ fn client_builder_max_concurrent_streams() { .uri("https://example.com/") .body(()) .unwrap(); - let req = client.send_request(request, true).unwrap().unwrap(); - h2.drive(req).map(move |(h2, _)| (client, h2)) + + let (response, _) = client.send_request(request, true).unwrap(); + h2.drive(response).map(move |(h2, _)| (client, h2)) }); h2.join(srv).wait().expect("wait"); @@ -226,8 +227,8 @@ fn request_over_max_concurrent_streams_errors() { .unwrap(); // first request is allowed - let req = client.send_request(request, true).unwrap().unwrap(); - h2.drive(req).map(move |(h2, _)| (client, h2)) + let (response, _) = client.send_request(request, true).unwrap(); + h2.drive(response).map(move |(h2, _)| (client, h2)) }) .and_then(|(mut client, h2)| { let request = Request::builder() @@ -237,7 +238,7 @@ fn request_over_max_concurrent_streams_errors() { .unwrap(); // first request is allowed - let mut req = client.send_request(request, false).unwrap(); + let (resp1, mut stream1) = client.send_request(request, false).unwrap(); let request = Request::builder() .method(Method::POST) @@ -246,7 +247,7 @@ fn request_over_max_concurrent_streams_errors() { .unwrap(); // second request is put into pending_open - let mut req2 = client.send_request(request, false).unwrap(); + let (resp2, mut stream2) = client.send_request(request, false).unwrap(); let request = Request::builder() .method(Method::GET) @@ -260,11 +261,12 @@ fn request_over_max_concurrent_streams_errors() { let err = client.send_request(request, true).unwrap_err(); assert_eq!(err.to_string(), "user error: rejected"); - req.send_data("hello".into(), true).expect("req send_data"); - h2.drive(req.expect("req")).and_then(move |(h2, _)| { - req2.send_data("hello".into(), true) + stream1.send_data("hello".into(), true).expect("req send_data"); + + h2.drive(resp1.expect("req")).and_then(move |(h2, _)| { + stream2.send_data("hello".into(), true) .expect("req2 send_data"); - h2.expect("h2").join(req2.expect("req2")) + h2.expect("h2").join(resp2.expect("req2")) }) }); @@ -305,12 +307,15 @@ fn sending_request_on_closed_connection() { .uri("https://http2.akamai.com/") .body(()) .unwrap(); + // first request works let req = client .send_request(request, true) .expect("send_request1") + .0 .expect("response1") .map(|_| ()); + // after finish request1, there should be a conn error let h2 = h2.then(|res| { res.expect_err("h2 error"); diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 0ee5a14..b0f7df9 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -35,7 +35,7 @@ fn send_data_without_requesting_capacity() { .body(()) .unwrap(); - let mut stream = client.send_request(request, false).unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); // The capacity should be immediately allocated assert_eq!(stream.capacity(), 0); @@ -44,7 +44,7 @@ fn send_data_without_requesting_capacity() { stream.send_data(payload[..].into(), true).unwrap(); // Get the response - let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + let resp = h2.run(response).unwrap(); assert_eq!(resp.status(), StatusCode::NO_CONTENT); h2.wait().unwrap(); @@ -90,6 +90,7 @@ fn release_capacity_sends_window_update() { .unwrap(); let req = client.send_request(request, true).unwrap() + .0 .unwrap() // Get the response .and_then(|resp| { @@ -117,6 +118,7 @@ fn release_capacity_sends_window_update() { assert_eq!(buf.unwrap().len(), payload.len()); Ok(()) }); + h2.unwrap().join(req) }); h2.join(mock).wait().unwrap(); @@ -153,6 +155,7 @@ fn release_capacity_of_small_amount_does_not_send_window_update() { .unwrap(); let req = client.send_request(request, true).unwrap() + .0 .unwrap() // Get the response .and_then(|resp| { @@ -222,7 +225,7 @@ fn recv_data_overflows_connection_window() { let req = client .send_request(request, true) .unwrap() - .unwrap() + .0.unwrap() .and_then(|resp| { assert_eq!(resp.status(), StatusCode::OK); let body = resp.into_parts().1; @@ -292,7 +295,7 @@ fn recv_data_overflows_stream_window() { let req = client .send_request(request, true) .unwrap() - .unwrap() + .0.unwrap() .and_then(|resp| { assert_eq!(resp.status(), StatusCode::OK); let body = resp.into_parts().1; @@ -343,7 +346,7 @@ fn stream_close_by_data_frame_releases_capacity() { .unwrap(); // Send request - let mut s1 = client.send_request(request, false).unwrap(); + let (resp1, mut s1) = client.send_request(request, false).unwrap(); // This effectively reserves the entire connection window s1.reserve_capacity(window_size); @@ -359,7 +362,7 @@ fn stream_close_by_data_frame_releases_capacity() { .unwrap(); // Create a second stream - let mut s2 = client.send_request(request, false).unwrap(); + let (resp2, mut s2) = client.send_request(request, false).unwrap(); // Request capacity s2.reserve_capacity(5); @@ -380,8 +383,8 @@ fn stream_close_by_data_frame_releases_capacity() { // Drive both streams to prevent the handles from being dropped // (which will send a RST_STREAM) before the connection is closed. - h2.drive(s1) - .and_then(move |(h2, _)| h2.drive(s2)) + h2.drive(resp1) + .and_then(move |(h2, _)| h2.drive(resp2)) }) .unwrap(); @@ -414,7 +417,7 @@ fn stream_close_by_trailers_frame_releases_capacity() { .unwrap(); // Send request - let mut s1 = client.send_request(request, false).unwrap(); + let (resp1, mut s1) = client.send_request(request, false).unwrap(); // This effectively reserves the entire connection window s1.reserve_capacity(window_size); @@ -430,7 +433,7 @@ fn stream_close_by_trailers_frame_releases_capacity() { .unwrap(); // Create a second stream - let mut s2 = client.send_request(request, false).unwrap(); + let (resp2, mut s2) = client.send_request(request, false).unwrap(); // Request capacity s2.reserve_capacity(5); @@ -451,8 +454,8 @@ fn stream_close_by_trailers_frame_releases_capacity() { // Drive both streams to prevent the handles from being dropped // (which will send a RST_STREAM) before the connection is closed. - h2.drive(s1) - .and_then(move |(h2, _)| h2.drive(s2)) + h2.drive(resp1) + .and_then(move |(h2, _)| h2.drive(resp2)) }) .unwrap(); @@ -484,27 +487,6 @@ fn stream_close_by_send_reset_frame_releases_capacity() {} #[ignore] fn stream_close_by_recv_reset_frame_releases_capacity() {} -use futures::{Async, Poll}; - -struct GetResponse { - stream: Option>, -} - -impl Future for GetResponse { - type Item = (Response>, client::Stream); - type Error = (); - - fn poll(&mut self) -> Poll { - let response = match self.stream.as_mut().unwrap().poll_response() { - Ok(Async::Ready(v)) => v, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(e) => panic!("unexpected error; {:?}", e), - }; - - Ok(Async::Ready((response, self.stream.take().unwrap()))) - } -} - #[test] fn recv_window_update_on_stream_closed_by_data_frame() { let _ = ::env_logger::init(); @@ -519,12 +501,10 @@ fn recv_window_update_on_stream_closed_by_data_frame() { .body(()) .unwrap(); - let stream = client.send_request(request, false).unwrap(); + let (response, stream) = client.send_request(request, false).unwrap(); // Wait for the response - h2.drive(GetResponse { - stream: Some(stream), - }) + h2.drive(response.map(|response| (response, stream))) }) .and_then(|(h2, (response, mut stream))| { assert_eq!(response.status(), StatusCode::OK); @@ -570,7 +550,7 @@ fn reserved_capacity_assigned_in_multi_window_updates() { .body(()) .unwrap(); - let mut stream = client.send_request(request, false).unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); // Consume the capacity let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize]; @@ -579,17 +559,17 @@ fn reserved_capacity_assigned_in_multi_window_updates() { // Reserve more data than we want stream.reserve_capacity(10); - h2.drive(util::wait_for_capacity(stream, 5)) + h2.drive( + util::wait_for_capacity(stream, 5) + .map(|stream| (response, stream))) }) - .and_then(|(h2, mut stream)| { + .and_then(|(h2, (response, mut stream))| { stream.send_data("hello".into(), false).unwrap(); stream.send_data("world".into(), true).unwrap(); - h2.drive(GetResponse { - stream: Some(stream), - }) + h2.drive(response) }) - .and_then(|(h2, (response, _))| { + .and_then(|(h2, response)| { assert_eq!(response.status(), StatusCode::NO_CONTENT); // Wait for the connection to close @@ -709,8 +689,8 @@ fn connection_notified_on_released_capacity() { }); // Get the two requests - let a = rx.recv().unwrap(); - let b = rx.recv().unwrap(); + let (a, _) = rx.recv().unwrap(); + let (b, _) = rx.recv().unwrap(); // Get the first response let response = a.wait().unwrap(); @@ -777,20 +757,20 @@ fn recv_settings_removes_available_capacity() { .uri("https://http2.akamai.com/") .body(()).unwrap(); - let mut stream = client.send_request(request, false).unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); stream.reserve_capacity(11); - h2.drive(util::wait_for_capacity(stream, 11)) + h2.drive(util::wait_for_capacity(stream, 11).map(|s| (response, s))) }) - .and_then(|(h2, mut stream)| { + .and_then(|(h2, (response, mut stream))| { assert_eq!(stream.capacity(), 11); stream.send_data("hello world".into(), true).unwrap(); - h2.drive(GetResponse { stream: Some(stream) }) + h2.drive(response) }) - .and_then(|(h2, (response, _))| { + .and_then(|(h2, response)| { assert_eq!(response.status(), StatusCode::NO_CONTENT); // Wait for the connection to close @@ -838,20 +818,20 @@ fn recv_no_init_window_then_receive_some_init_window() { .uri("https://http2.akamai.com/") .body(()).unwrap(); - let mut stream = client.send_request(request, false).unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); stream.reserve_capacity(11); - h2.drive(util::wait_for_capacity(stream, 11)) + h2.drive(util::wait_for_capacity(stream, 11).map(|s| (response, s))) }) - .and_then(|(h2, mut stream)| { + .and_then(|(h2, (response, mut stream))| { assert_eq!(stream.capacity(), 11); stream.send_data("hello world".into(), true).unwrap(); - h2.drive(GetResponse { stream: Some(stream) }) + h2.drive(response) }) - .and_then(|(h2, (response, _))| { + .and_then(|(h2, response)| { assert_eq!(response.status(), StatusCode::NO_CONTENT); // Wait for the connection to close @@ -930,12 +910,12 @@ fn settings_lowered_capacity_returns_capacity_to_connection() { let request = Request::post("https://example.com/one") .body(()).unwrap(); - let mut stream1 = client.send_request(request, false).unwrap(); + let (resp1, mut stream1) = client.send_request(request, false).unwrap(); let request = Request::post("https://example.com/two") .body(()).unwrap(); - let mut stream2 = client.send_request(request, false).unwrap(); + let (resp2, mut stream2) = client.send_request(request, false).unwrap(); // Reserve capacity for stream one, this will consume all connection level // capacity @@ -955,10 +935,10 @@ fn settings_lowered_capacity_returns_capacity_to_connection() { stream1.send_data("hello world".into(), true).unwrap(); // Wait for responses.. - let resp = stream1.wait().unwrap(); + let resp = resp1.wait().unwrap(); assert_eq!(resp.status(), StatusCode::NO_CONTENT); - let resp = stream2.wait().unwrap(); + let resp = resp2.wait().unwrap(); assert_eq!(resp.status(), StatusCode::NO_CONTENT); th1.join().unwrap(); @@ -1011,7 +991,7 @@ fn increase_target_window_size_after_using_some() { .uri("https://http2.akamai.com/") .body(()).unwrap(); - let res = client.send_request(request, true).unwrap() + let res = client.send_request(request, true).unwrap().0 .and_then(|res| { // "leak" the capacity for now res.into_parts().1.concat2() @@ -1055,8 +1035,8 @@ fn decrease_target_window_size() { let request = Request::builder() .uri("https://http2.akamai.com/") .body(()).unwrap(); - let res = client.send_request(request, true).unwrap(); - conn.drive(res.expect("response")) + let (resp, _) = client.send_request(request, true).unwrap(); + conn.drive(resp.expect("response")) }) .and_then(|(mut conn, res)| { conn.set_target_window_size(16_384); diff --git a/tests/prioritization.rs b/tests/prioritization.rs index 4c881a5..b1a87d5 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -33,7 +33,7 @@ fn single_stream_send_large_body() { .body(()) .unwrap(); - let mut stream = client.send_request(request, false).unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); // Reserve capacity to send the payload stream.reserve_capacity(payload.len()); @@ -45,7 +45,7 @@ fn single_stream_send_large_body() { stream.send_data(payload[..].into(), true).unwrap(); // Get the response - let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + let resp = h2.run(response).unwrap(); assert_eq!(resp.status(), StatusCode::NO_CONTENT); h2.wait().unwrap(); @@ -87,7 +87,7 @@ fn single_stream_send_extra_large_body_multi_frames_one_buffer() { .body(()) .unwrap(); - let mut stream = client.send_request(request, false).unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); stream.reserve_capacity(payload.len()); @@ -98,7 +98,7 @@ fn single_stream_send_extra_large_body_multi_frames_one_buffer() { stream.send_data(payload.into(), true).unwrap(); // Get the response - let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + let resp = h2.run(response).unwrap(); assert_eq!(resp.status(), StatusCode::NO_CONTENT); h2.wait().unwrap(); @@ -152,7 +152,7 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { .body(()) .unwrap(); - let mut stream = client.send_request(request, false).unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); stream.reserve_capacity(payload.len()); @@ -163,7 +163,7 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { stream.send_data(payload.into(), true).unwrap(); // Get the response - let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + let resp = h2.run(response).unwrap(); assert_eq!(resp.status(), StatusCode::NO_CONTENT); @@ -185,7 +185,7 @@ fn send_data_receive_window_update() { .unwrap(); // Send request - let mut stream = client.send_request(request, false).unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); // Send data frame stream.send_data("hello".into(), false).unwrap(); @@ -196,9 +196,9 @@ fn send_data_receive_window_update() { h2.drive(util::wait_for_capacity( stream, frame::DEFAULT_INITIAL_WINDOW_SIZE as usize, - )) + ).map(|s| (response, s))) }) - .and_then(|(h2, mut stream)| { + .and_then(|(h2, (_r, mut stream))| { let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize]; stream.send_data(payload.into(), true).unwrap(); diff --git a/tests/push_promise.rs b/tests/push_promise.rs index f8bba00..1178167 100644 --- a/tests/push_promise.rs +++ b/tests/push_promise.rs @@ -29,7 +29,7 @@ fn recv_push_works() { let req = client .send_request(request, true) .unwrap() - .unwrap() + .0.unwrap() .and_then(|resp| { assert_eq!(resp.status(), StatusCode::OK); Ok(()) @@ -68,7 +68,8 @@ fn recv_push_when_push_disabled_is_conn_error() { .uri("https://http2.akamai.com/") .body(()) .unwrap(); - let req = client.send_request(request, true).unwrap().then(|res| { + + let req = client.send_request(request, true).unwrap().0.then(|res| { let err = res.unwrap_err(); assert_eq!( err.to_string(), diff --git a/tests/stream_states.rs b/tests/stream_states.rs index 7d8c3aa..66397c3 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -29,9 +29,9 @@ fn send_recv_headers_only() { .unwrap(); info!("sending request"); - let mut stream = client.send_request(request, true).unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); - let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + let resp = h2.run(response).unwrap(); assert_eq!(resp.status(), StatusCode::NO_CONTENT); h2.wait().unwrap(); @@ -71,7 +71,7 @@ fn send_recv_data() { .unwrap(); info!("sending request"); - let mut stream = client.send_request(request, false).unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); // Reserve send capacity stream.reserve_capacity(5); @@ -82,7 +82,7 @@ fn send_recv_data() { stream.send_data("hello", true).unwrap(); // Get the response - let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + let resp = h2.run(response).unwrap(); assert_eq!(resp.status(), StatusCode::OK); // Take the body @@ -128,9 +128,9 @@ fn send_headers_recv_data_single_frame() { .unwrap(); info!("sending request"); - let mut stream = client.send_request(request, true).unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); - let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + let resp = h2.run(response).unwrap(); assert_eq!(resp.status(), StatusCode::OK); // Take the body @@ -158,8 +158,8 @@ fn closed_streams_are_released() { let request = Request::get("https://example.com/").body(()).unwrap(); // Send request - let stream = client.send_request(request, true).unwrap(); - h2.drive(stream).and_then(move |(_, response)| { + let (response, _) = client.send_request(request, true).unwrap(); + h2.drive(response).and_then(move |(_, response)| { assert_eq!(response.status(), StatusCode::NO_CONTENT); // There are no active streams @@ -207,7 +207,7 @@ fn errors_if_recv_frame_exceeds_max_frame_size() { let req = client .send_request(request, true) .unwrap() - .unwrap() + .0.unwrap() .and_then(|resp| { assert_eq!(resp.status(), StatusCode::OK); let body = resp.into_parts().1; @@ -266,7 +266,7 @@ fn configure_max_frame_size() { let req = client .send_request(request, true) .unwrap() - .expect("response") + .0.expect("response") .and_then(|resp| { assert_eq!(resp.status(), StatusCode::OK); let body = resp.into_parts().1; @@ -334,7 +334,7 @@ fn recv_goaway_finishes_processed_streams() { let req1 = client.send_request(request, true) .unwrap() - .expect("response") + .0.expect("response") .and_then(|resp| { assert_eq!(resp.status(), StatusCode::OK); let body = resp.into_parts().1; @@ -354,7 +354,7 @@ fn recv_goaway_finishes_processed_streams() { .unwrap(); let req2 = client.send_request(request, true) .unwrap() - .then(|res| { + .0.then(|res| { let err = res.unwrap_err(); assert_eq!(err.to_string(), "protocol error: not a result of an error"); Ok::<(), ()>(()) diff --git a/tests/trailers.rs b/tests/trailers.rs index fb3d690..f584990 100644 --- a/tests/trailers.rs +++ b/tests/trailers.rs @@ -32,9 +32,9 @@ fn recv_trailers_only() { .unwrap(); info!("sending request"); - let mut stream = client.send_request(request, true).unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); - let response = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + let response = h2.run(response).unwrap(); assert_eq!(response.status(), StatusCode::OK); let (_, mut body) = response.into_parts(); @@ -80,14 +80,14 @@ fn send_trailers_immediately() { .unwrap(); info!("sending request"); - let mut stream = client.send_request(request, false).unwrap(); + let (response, mut stream) = client.send_request(request, false).unwrap(); let mut trailers = HeaderMap::new(); trailers.insert("zomg", "hello".parse().unwrap()); stream.send_trailers(trailers).unwrap(); - let response = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + let response = h2.run(response).unwrap(); assert_eq!(response.status(), StatusCode::OK); let (_, mut body) = response.into_parts();