diff --git a/Cargo.toml b/Cargo.toml index 901ed0a..73a2eaf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ members = [ futures = "0.1" tokio-io = "0.1.4" bytes = "0.4.7" -http = "0.1.3" +http = "0.1.8" byteorder = "1.0" log = "0.4.1" fnv = "1.0.5" diff --git a/src/client.rs b/src/client.rs index aaaaf63..31f4a76 100644 --- a/src/client.rs +++ b/src/client.rs @@ -162,8 +162,8 @@ use frame::{Headers, Pseudo, Reason, Settings, StreamId}; use proto; use bytes::{Bytes, IntoBuf}; -use futures::{Async, Future, Poll}; -use http::{uri, Request, Response, Method, Version}; +use futures::{Async, Future, Poll, Stream}; +use http::{uri, HeaderMap, Request, Response, Method, Version}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::WriteAll; @@ -294,6 +294,54 @@ pub struct Connection { #[must_use = "futures do nothing unless polled"] pub struct ResponseFuture { inner: proto::OpaqueStreamRef, + push_promise_consumed: bool, +} + +/// A future of a pushed HTTP response. +/// +/// We have to differentiate between pushed and non pushed because of the spec +/// +/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream +/// > that is in either the "open" or "half-closed (remote)" state. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct PushedResponseFuture { + inner: ResponseFuture, +} + +/// A pushed response and corresponding request headers +#[derive(Debug)] +pub struct PushPromise { + /// The request headers + request: Request<()>, + + /// The pushed response + response: PushedResponseFuture, +} + +#[derive(Debug)] +/// A stream of pushed responses and corresponding promised requests +pub struct PushPromises { + inner: proto::OpaqueStreamRef, +} + +impl Stream for PushPromises { + type Item = PushPromise; + type Error = ::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + match try_ready!(self.inner.poll_pushed()) { + Some((request, response)) => { + let response = PushedResponseFuture { + inner: ResponseFuture { + inner: response, push_promise_consumed: false + } + }; + Ok(Async::Ready(Some(PushPromise{request, response}))) + } + None => Ok(Async::Ready(None)), + } + } } /// Builds client connections with custom configuration values. @@ -516,7 +564,7 @@ where /// .body(()) /// .unwrap(); /// - /// // Send the request to the server. Since we are not sending a + /// // Send the request to the server. If we are not sending a /// // body or trailers, we can drop the `SendStream` instance. /// let (response, mut send_stream) = send_request /// .send_request(request, false).unwrap(); @@ -567,6 +615,7 @@ where let response = ResponseFuture { inner: stream.clone_to_opaque(), + push_promise_consumed: false, }; let stream = SendStream::new(stream); @@ -1352,6 +1401,61 @@ impl ResponseFuture { pub fn stream_id(&self) -> ::StreamId { ::StreamId::from_internal(self.inner.stream_id()) } + /// Returns a stream of PushPromises + /// + /// # Panics + /// + /// If this method has been called before + /// or the stream was itself was pushed + pub fn push_promises(&mut self) -> PushPromises { + if self.push_promise_consumed { + panic!("Reference to push promises stream taken!"); + } + self.push_promise_consumed = true; + PushPromises { inner: self.inner.clone() } + } +} + +// ===== impl PushPromise ===== + +impl PushPromise { + /// Returns a reference to the push promise's request headers. + pub fn request(&self) -> &Request<()> { + &self.request + } + + /// Returns a mutable reference to the push promise's request headers. + pub fn request_mut(&mut self) -> &mut Request<()> { + &mut self.request + } + + /// Consumes `self`, returning the push promise's request headers and + /// response future. + pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) { + (self.request, self.response) + } +} + +// ===== impl PushedResponseFuture ===== + +impl Future for PushedResponseFuture { + type Item = Response; + type Error = ::Error; + + fn poll(&mut self) -> Poll { + self.inner.poll() + } +} + +impl PushedResponseFuture { + /// Returns the stream ID of the response stream. + /// + /// # Panics + /// + /// If the lock on the stream store has been poisoned. + pub fn stream_id(&self) -> ::StreamId { + self.inner.stream_id() + } } // ===== impl Peer ===== @@ -1431,12 +1535,11 @@ impl proto::Peer for Peer { false } - fn convert_poll_message(headers: Headers) -> Result { + fn convert_poll_message( + pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId + ) -> Result { let mut b = Response::builder(); - let stream_id = headers.stream_id(); - let (pseudo, fields) = headers.into_parts(); - b.version(Version::HTTP_2); if let Some(status) = pseudo.status { diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 6ed8f10..6b626fb 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -380,6 +380,13 @@ impl PushPromise { } } +impl PushPromise { + /// Consume `self`, returning the parts of the frame + pub fn into_parts(self) -> (Pseudo, HeaderMap) { + (self.header_block.pseudo, self.header_block.fields) + } +} + #[cfg(feature = "unstable")] impl PushPromise { pub fn new( @@ -400,10 +407,6 @@ impl PushPromise { } } - pub fn into_parts(self) -> (Pseudo, HeaderMap) { - (self.header_block.pseudo, self.header_block.fields) - } - pub fn fields(&self) -> &HeaderMap { &self.header_block.fields } diff --git a/src/proto/peer.rs b/src/proto/peer.rs index 8362db6..bcc73b9 100644 --- a/src/proto/peer.rs +++ b/src/proto/peer.rs @@ -1,9 +1,9 @@ use codec::RecvError; use error::Reason; -use frame::{Headers, StreamId}; +use frame::{Pseudo, StreamId}; use proto::Open; -use http::{Request, Response}; +use http::{HeaderMap, Request, Response}; use std::fmt; @@ -16,7 +16,9 @@ pub(crate) trait Peer { fn is_server() -> bool; - fn convert_poll_message(headers: Headers) -> Result; + fn convert_poll_message( + pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId + ) -> Result; fn is_local_init(id: StreamId) -> bool { assert!(!id.is_zero()); @@ -51,12 +53,14 @@ impl Dyn { self.is_server() == id.is_server_initiated() } - pub fn convert_poll_message(&self, headers: Headers) -> Result { + pub fn convert_poll_message( + &self, pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId + ) -> Result { if self.is_server() { - ::server::Peer::convert_poll_message(headers) + ::server::Peer::convert_poll_message(pseudo, fields, stream_id) .map(PollMessage::Server) } else { - ::client::Peer::convert_poll_message(headers) + ::client::Peer::convert_poll_message(pseudo, fields, stream_id) .map(PollMessage::Client) } } diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 6216550..02a2359 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -28,7 +28,6 @@ use frame::{StreamId, StreamIdOverflow}; use proto::*; use bytes::Bytes; -use http::{Request, Response}; use std::time::Duration; #[derive(Debug)] diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 83e49a4..df54a39 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -3,7 +3,7 @@ use {frame, proto}; use codec::{RecvError, UserError}; use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE}; -use http::HeaderMap; +use http::{HeaderMap, Response, Request, Method}; use std::io; use std::time::{Duration, Instant}; @@ -216,7 +216,9 @@ impl Recv { }; } - let message = counts.peer().convert_poll_message(frame)?; + let stream_id = frame.stream_id(); + let (pseudo, fields) = frame.into_parts(); + let message = counts.peer().convert_poll_message(pseudo, fields, stream_id)?; // Push the frame onto the stream's recv buffer stream @@ -247,6 +249,37 @@ impl Recv { } } + /// Called by the client to get pushed response + pub fn poll_pushed( + &mut self, stream: &mut store::Ptr + ) -> Poll, store::Key)>, proto::Error> { + use super::peer::PollMessage::*; + + let mut ppp = stream.pending_push_promises.take(); + let pushed = ppp.pop(stream.store_mut()).map( + |mut pushed| match pushed.pending_recv.pop_front(&mut self.buffer) { + Some(Event::Headers(Server(headers))) => + Async::Ready(Some((headers, pushed.key()))), + // When frames are pushed into the queue, it is verified that + // the first frame is a HEADERS frame. + _ => panic!("Headers not set on pushed stream") + } + ); + stream.pending_push_promises = ppp; + if let Some(p) = pushed { + Ok(p) + } else { + let is_open = stream.state.ensure_recv_open()?; + + if is_open { + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } else { + Ok(Async::Ready(None)) + } + } + } + /// Called by the client to get the response pub fn poll_response( &mut self, @@ -538,13 +571,7 @@ impl Recv { frame: frame::PushPromise, stream: &mut store::Ptr, ) -> Result<(), RecvError> { - - // TODO: Streams in the reserved states do not count towards the concurrency - // limit. However, it seems like there should be a cap otherwise this - // could grow in memory indefinitely. - stream.state.reserve_remote()?; - if frame.is_over_size() { // A frame is over size if the decoded header block was bigger than // SETTINGS_MAX_HEADER_LIST_SIZE. @@ -564,9 +591,46 @@ impl Recv { }); } + let promised_id = frame.promised_id(); + use http::header; + let (pseudo, fields) = frame.into_parts(); + let req = ::server::Peer::convert_poll_message(pseudo, fields, promised_id)?; + // The spec has some requirements for promised request headers + // [https://httpwg.org/specs/rfc7540.html#PushRequests] + + // A promised request "that indicates the presence of a request body + // MUST reset the promised stream with a stream error" + if let Some(content_length) = req.headers().get(header::CONTENT_LENGTH) { + match parse_u64(content_length.as_bytes()) { + Ok(0) => {}, + _ => { + return Err(RecvError::Stream { + id: promised_id, + reason: Reason::PROTOCOL_ERROR, + }); + }, + } + } + // "The server MUST include a method in the :method pseudo-header field + // that is safe and cacheable" + if !Self::safe_and_cacheable(req.method()) { + return Err(RecvError::Stream { + id: promised_id, + reason: Reason::PROTOCOL_ERROR, + }); + } + use super::peer::PollMessage::*; + stream.pending_recv.push_back(&mut self.buffer, Event::Headers(Server(req))); + stream.notify_recv(); Ok(()) } + fn safe_and_cacheable(method: &Method) -> bool { + // Cacheable: https://httpwg.org/specs/rfc7231.html#cacheable.methods + // Safe: https://httpwg.org/specs/rfc7231.html#safe.methods + return method == Method::GET || method == Method::HEAD; + } + /// Ensures that `id` is not in the `Idle` state. pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { if let Ok(next) = self.next_stream_id { diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 36257fe..d8d3c6c 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -153,10 +153,7 @@ impl State { if eos { Closed(Cause::EndStream) } else { - Open { - local: AwaitingHeaders, - remote, - } + HalfClosedLocal(Streaming) } }, Open { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index bc33244..0b41530 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -245,7 +245,6 @@ where if let Err(RecvError::Stream { .. }) = res { actions.recv.release_connection_capacity(sz as WindowSize, &mut None); } - actions.reset_on_recv_stream_err(send_buffer, stream, counts, res) }) } @@ -426,6 +425,10 @@ where None => return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)), }; + // TODO: Streams in the reserved states do not count towards the concurrency + // limit. However, it seems like there should be a cap otherwise this + // could grow in memory indefinitely. + // Ensure that we can reserve streams me.actions.recv.ensure_can_reserve()?; @@ -437,8 +440,9 @@ where return Ok(()); } - // Create a scope - let child_key = { + // Try to handle the frame and create a corresponding key for the pushed stream + // this requires a bit of indirection to make the borrow checker happy. + let child_key: Option = { // Create state for the stream let stream = me.store.insert(promised_id, { Stream::new( @@ -450,23 +454,29 @@ where let actions = &mut me.actions; me.counts.transition(stream, |counts, stream| { - let res = actions.recv.recv_push_promise(frame, stream); + let stream_valid = + actions.recv.recv_push_promise(frame, stream); - let mut send_buffer = self.send_buffer.inner.lock().unwrap(); - actions.reset_on_recv_stream_err(&mut *send_buffer, stream, counts, res) - .map(|_| stream.key()) + match stream_valid { + Ok(()) => + Ok(Some(stream.key())), + _ => { + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + actions.reset_on_recv_stream_err(&mut *send_buffer, stream, counts, stream_valid) + .map(|()| None) + } + } })? }; + // If we're successful, push the headers and stream... + if let Some(child) = child_key { + let mut ppp = me.store[parent_key].pending_push_promises.take(); + ppp.push(&mut me.store.resolve(child)); - // Push the stream... this requires a bit of indirection to make - // the borrow checker happy. - let mut ppp = me.store[parent_key].pending_push_promises.take(); - ppp.push(&mut me.store.resolve(child_key)); - - let parent = &mut me.store[parent_key]; - - parent.pending_push_promises = ppp; - parent.notify_recv(); + let parent = &mut me.store.resolve(parent_key); + parent.pending_push_promises = ppp; + parent.notify_recv(); + }; Ok(()) } @@ -972,6 +982,26 @@ impl OpaqueStreamRef { me.actions.recv.poll_response(&mut stream) } + /// Called by a client to check for a pushed request. + pub fn poll_pushed( + &mut self + ) -> Poll, OpaqueStreamRef)>, proto::Error> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let res = { + let mut stream = me.store.resolve(self.key); + try_ready!(me.actions.recv.poll_pushed(&mut stream)) + }; + Ok(Async::Ready(res.map(|(h, key)| { + me.store.resolve(key).ref_inc(); + let opaque_ref = + OpaqueStreamRef { + inner: self.inner.clone(), key, + }; + (h, opaque_ref) + }))) + } pub fn body_is_empty(&self) -> bool { let mut me = self.inner.lock().unwrap(); @@ -1102,6 +1132,7 @@ fn drop_stream_ref(inner: &Mutex, key: store::Key) { maybe_cancel(stream, actions, counts); if stream.ref_count == 0 { + // We won't be able to reach our push promises anymore let mut ppp = stream.pending_push_promises.take(); while let Some(promise) = ppp.pop(stream.store_mut()) { counts.transition(promise, |counts, stream| { diff --git a/src/server.rs b/src/server.rs index 124ae52..15558ea 100644 --- a/src/server.rs +++ b/src/server.rs @@ -131,12 +131,12 @@ use {SendStream, RecvStream, ReleaseCapacity}; use codec::{Codec, RecvError}; -use frame::{self, Reason, Settings, StreamId}; +use frame::{self, Pseudo, Reason, Settings, StreamId}; use proto::{self, Config, Prioritized}; use bytes::{Buf, Bytes, IntoBuf}; use futures::{self, Async, Future, Poll}; -use http::{Request, Response}; +use http::{HeaderMap, Request, Response}; use std::{convert, fmt, io, mem}; use std::time::Duration; use tokio_io::{AsyncRead, AsyncWrite}; @@ -1168,7 +1168,7 @@ impl Peer { // Build the set pseudo header set. All requests will include `method` // and `path`. - let pseudo = frame::Pseudo::response(status); + let pseudo = Pseudo::response(status); // Create the HEADERS frame let mut frame = frame::Headers::new(id, pseudo, headers); @@ -1192,14 +1192,13 @@ impl proto::Peer for Peer { proto::DynPeer::Server } - fn convert_poll_message(headers: frame::Headers) -> Result { + fn convert_poll_message( + pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId + ) -> Result { use http::{uri, Version}; let mut b = Request::builder(); - let stream_id = headers.stream_id(); - let (pseudo, fields) = headers.into_parts(); - macro_rules! malformed { ($($arg:tt)*) => {{ debug!($($arg)*); diff --git a/tests/h2-support/src/frames.rs b/tests/h2-support/src/frames.rs index 3ef34cb..36e890e 100644 --- a/tests/h2-support/src/frames.rs +++ b/tests/h2-support/src/frames.rs @@ -220,6 +220,18 @@ impl Mock { Mock(frame) } + pub fn field(self, key: K, value: V) -> Self + where + K: HttpTryInto, + V: HttpTryInto, + { + let (id, promised, pseudo, mut fields) = self.into_parts(); + fields.insert(key.try_into().unwrap(), value.try_into().unwrap()); + let frame = frame::PushPromise::new(id, promised, pseudo, fields); + Mock(frame) + } + + fn into_parts(self) -> (StreamId, StreamId, frame::Pseudo, HeaderMap) { assert!(self.0.is_end_headers(), "unset eoh will be lost"); let id = self.0.stream_id(); diff --git a/tests/h2-tests/tests/push_promise.rs b/tests/h2-tests/tests/push_promise.rs index 44cfefe..1631daa 100644 --- a/tests/h2-tests/tests/push_promise.rs +++ b/tests/h2-tests/tests/push_promise.rs @@ -4,8 +4,6 @@ use h2_support::prelude::*; #[test] fn recv_push_works() { - // tests that by default, received push promises work - // TODO: once API exists, read the pushed response let _ = ::env_logger::try_init(); let (io, srv) = mock::new(); @@ -17,9 +15,11 @@ fn recv_push_works() { .request("GET", "https://http2.akamai.com/") .eos(), ) + .send_frame(frames::headers(1).response(404)) .send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css")) - .send_frame(frames::headers(1).response(200).eos()) - .send_frame(frames::headers(2).response(200).eos()); + .send_frame(frames::data(1, "").eos()) + .send_frame(frames::headers(2).response(200)) + .send_frame(frames::data(2, "promised_data").eos()); let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| { let request = Request::builder() @@ -27,16 +27,82 @@ fn recv_push_works() { .uri("https://http2.akamai.com/") .body(()) .unwrap(); - let req = client + let (mut resp, _) = client .send_request(request, true) - .unwrap() - .0.unwrap() - .and_then(|resp| { + .unwrap(); + let pushed = resp.push_promises(); + let check_resp_status = resp.unwrap().map(|resp| { + assert_eq!(resp.status(), StatusCode::NOT_FOUND) + }); + let check_pushed_request = pushed.and_then(|headers| { + let (request, response) = headers.into_parts(); + assert_eq!(request.into_parts().0.method, Method::GET); + response + }); + let check_pushed_response = check_pushed_request.and_then( + |resp| { assert_eq!(resp.status(), StatusCode::OK); - Ok(()) - }); + resp.into_body().concat2().map(|b| assert_eq!(b, "promised_data")) + } + ).collect().unwrap().map(|ps| { + assert_eq!(1, ps.len()) + }); + h2.drive(check_resp_status.join(check_pushed_response)) + }); - h2.drive(req) + h2.join(mock).wait().unwrap(); +} + +#[test] +fn pushed_streams_arent_dropped_too_early() { + // tests that by default, received push promises work + let _ = ::env_logger::try_init(); + + let (io, srv) = mock::new(); + let mock = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .send_frame(frames::headers(1).response(404)) + .send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css")) + .send_frame(frames::push_promise(1, 4).request("GET", "https://http2.akamai.com/style2.css")) + .send_frame(frames::data(1, "").eos()) + .idle_ms(10) + .send_frame(frames::headers(2).response(200)) + .send_frame(frames::headers(4).response(200).eos()) + .send_frame(frames::data(2, "").eos()) + .recv_frame(frames::go_away(4)); + + let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| { + let request = Request::builder() + .method(Method::GET) + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + let (mut resp, _) = client + .send_request(request, true) + .unwrap(); + let pushed = resp.push_promises(); + let check_status = resp.unwrap().and_then(|resp| { + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + Ok(()) + }); + let check_pushed_headers = pushed.and_then(|headers| { + let (request, response) = headers.into_parts(); + assert_eq!(request.into_parts().0.method, Method::GET); + response + }); + let check_pushed = check_pushed_headers.map( + |resp| assert_eq!(resp.status(), StatusCode::OK) + ).collect().unwrap().and_then(|ps| { + assert_eq!(2, ps.len()); + Ok(()) + }); + h2.drive(check_status.join(check_pushed)).and_then(|(conn, _)| conn.expect("client")) }); h2.join(mock).wait().unwrap(); @@ -189,9 +255,58 @@ fn recv_push_promise_over_max_header_list_size() { } #[test] -#[ignore] -fn recv_push_promise_with_unsafe_method_is_stream_error() { - // for instance, when :method = POST +fn recv_invalid_push_promise_headers_is_stream_protocol_error() { + // Unsafe method or content length is stream protocol error + let _ = ::env_logger::try_init(); + + let (io, srv) = mock::new(); + let mock = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .send_frame(frames::headers(1).response(404)) + .send_frame(frames::push_promise(1, 2).request("POST", "https://http2.akamai.com/style.css")) + .send_frame( + frames::push_promise(1, 4) + .request("GET", "https://http2.akamai.com/style.css") + .field(http::header::CONTENT_LENGTH, 1) + ) + .send_frame( + frames::push_promise(1, 6) + .request("GET", "https://http2.akamai.com/style.css") + .field(http::header::CONTENT_LENGTH, 0) + ) + .send_frame(frames::headers(1).response(404).eos()) + .recv_frame(frames::reset(2).protocol_error()) + .recv_frame(frames::reset(4).protocol_error()) + .send_frame(frames::headers(6).response(200).eos()) + .close(); + + let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| { + let request = Request::builder() + .method(Method::GET) + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + let (mut resp, _) = client + .send_request(request, true) + .unwrap(); + let check_pushed_request = resp.push_promises().and_then(|headers| { + headers.into_parts().1 + }); + let check_pushed_response = check_pushed_request + .collect().unwrap().map(|ps| { + // CONTENT_LENGTH = 0 is ok + assert_eq!(1, ps.len()) + }); + h2.drive(check_pushed_response) + }); + + h2.join(mock).wait().unwrap(); } #[test] @@ -202,8 +317,6 @@ fn recv_push_promise_with_wrong_authority_is_stream_error() { #[test] fn recv_push_promise_skipped_stream_id() { - // tests that by default, received push promises work - // TODO: once API exists, read the pushed response let _ = ::env_logger::try_init(); let (io, srv) = mock::new(); @@ -218,8 +331,7 @@ fn recv_push_promise_skipped_stream_id() { .send_frame(frames::push_promise(1, 4).request("GET", "https://http2.akamai.com/style.css")) .send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css")) .recv_frame(frames::go_away(0).protocol_error()) - .close() - ; + .close(); let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| { let request = Request::builder() @@ -255,8 +367,6 @@ fn recv_push_promise_skipped_stream_id() { #[test] fn recv_push_promise_dup_stream_id() { - // tests that by default, received push promises work - // TODO: once API exists, read the pushed response let _ = ::env_logger::try_init(); let (io, srv) = mock::new(); @@ -271,8 +381,7 @@ fn recv_push_promise_dup_stream_id() { .send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css")) .send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css")) .recv_frame(frames::go_away(0).protocol_error()) - .close() - ; + .close(); let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| { let request = Request::builder()