From 8a1c4d3d524e2e766dc3cd3d7cad823e31416173 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 27 Oct 2017 14:14:00 -0700 Subject: [PATCH] Add test and assertion for idle state handling (#160) --- src/proto/streams/prioritize.rs | 23 +++++++++- src/proto/streams/state.rs | 7 +++ tests/stream_states.rs | 54 +++++++++++++++++++++++ tests/support/future_ext.rs | 76 +++++++++++++++++++++++++++++++++ 4 files changed, 158 insertions(+), 2 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index b34d370..ac9866b 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -1,7 +1,7 @@ use super::*; use super::store::Resolve; -use frame::Reason; +use frame::{Reason, StreamId}; use codec::UserError; use codec::UserError::*; @@ -11,9 +11,18 @@ use bytes::buf::Take; use std::{cmp, fmt}; use std::io; +/// # Warning +/// +/// Queued streans are ordered by stream ID, as we need to ensure that +/// lower-numbered streams are sent headers before higher-numbered ones. +/// This is because "idle" stream IDs – those which have been initiated but +/// have yet to receive frames – will be implicitly closed on receipt of a +/// frame on a higher stream ID. If these queues was not ordered by stream +/// IDs, some mechanism would be necessary to ensure that the lowest-numberedh] +/// idle stream is opened first. #[derive(Debug)] pub(super) struct Prioritize { - /// Queue of streams waiting for socket capacity to send a frame + /// Queue of streams waiting for socket capacity to send a frame. pending_send: store::Queue, /// Queue of streams waiting for window capacity to produce data. @@ -24,6 +33,9 @@ pub(super) struct Prioritize { /// Connection level flow control governing sent data flow: FlowControl, + + /// Stream ID of the last stream opened. + last_opened_id: StreamId, } pub(crate) struct Prioritized { @@ -55,6 +67,7 @@ impl Prioritize { pending_capacity: store::Queue::new(), pending_open: store::Queue::new(), flow: flow, + last_opened_id: StreamId::ZERO } } @@ -617,6 +630,11 @@ impl Prioritize { trace!("pop_frame; frame={:?}", frame); + if cfg!(debug_assertions) && stream.state.is_idle() { + debug_assert!(stream.id > self.last_opened_id); + self.last_opened_id = stream.id; + } + if !stream.pending_send.is_empty() || stream.state.is_canceled() { // TODO: Only requeue the sender IF it is ready to send // the next frame. i.e. don't requeue it if the next @@ -640,6 +658,7 @@ impl Prioritize { while counts.can_inc_num_send_streams() { if let Some(mut stream) = self.pending_open.pop(store) { trace!("schedule_pending_open; stream={:?}", stream.id); + counts.inc_num_send_streams(); self.pending_send.push(&mut stream); if let Some(task) = stream.open_task.take() { diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index d72dc84..ae50427 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -331,6 +331,13 @@ impl State { } } + pub fn is_idle(&self) -> bool { + match self.inner { + Idle => true, + _ => false, + } + } + pub fn ensure_recv_open(&self) -> Result { use std::io; diff --git a/tests/stream_states.rs b/tests/stream_states.rs index f94185c..f16b39c 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -369,6 +369,60 @@ fn recv_goaway_finishes_processed_streams() { h2.join(srv).wait().expect("wait"); } +#[test] +fn skipped_stream_ids_are_implicitly_closed() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let srv = srv + .assert_client_handshake() + .expect("handshake") + .recv_settings() + .recv_frame(frames::headers(5) + .request("GET", "https://example.com/") + .eos(), + ) + // send the response on a lower-numbered stream, which should be + // implicitly closed. + .send_frame(frames::headers(3).response(200)); + + let h2 = Client::builder() + .initial_stream_id(5) + .handshake::<_, Bytes>(io) + .expect("handshake") + .and_then(|(mut client, h2)| { + let request = Request::builder() + .method(Method::GET) + .uri("https://example.com/") + .body(()) + .unwrap(); + + let req = client.send_request(request, true) + .unwrap() + .0.then(|res| { + let err = res.unwrap_err(); + assert_eq!( + err.to_string(), + "protocol error: unspecific protocol error detected"); + Ok::<(), ()>(()) + }); + // client should see a conn error + let conn = h2.then(|res| { + let err = res.unwrap_err(); + assert_eq!( + err.to_string(), + "protocol error: unspecific protocol error detected" + ); + Ok::<(), ()>(()) + }); + conn.unwrap().join(req) + }); + + + h2.join(srv).wait().expect("wait"); + +} + /* #[test] fn send_data_after_headers_eos() { diff --git a/tests/support/future_ext.rs b/tests/support/future_ext.rs index a156c24..9575bc1 100644 --- a/tests/support/future_ext.rs +++ b/tests/support/future_ext.rs @@ -15,6 +15,30 @@ pub trait FutureExt: Future { } } + /// Panic on success, yielding the content of an `Err`. + fn unwrap_err(self) -> UnwrapErr + where + Self: Sized, + Self::Error: fmt::Debug, + { + UnwrapErr { + inner: self, + } + } + + /// Panic on success, with a message. + fn expect_err(self, msg: T) -> ExpectErr + where + Self: Sized, + Self::Error: fmt::Debug, + T: fmt::Display, + { + ExpectErr{ + inner: self, + msg: msg.to_string(), + } + } + /// Panic on error, with a message. fn expect(self, msg: T) -> Expect where @@ -68,6 +92,32 @@ where } } +// ===== UnwrapErr ====== + +/// Panic on success. +pub struct UnwrapErr { + inner: T, +} + +impl Future for UnwrapErr +where + T: Future, + T::Item: fmt::Debug, + T::Error: fmt::Debug, +{ + type Item = T::Error; + type Error = (); + + fn poll(&mut self) -> Poll { + let poll = + self.inner.poll() + .map_err(Async::Ready) + .unwrap_err(); + Ok(poll) + } +} + + // ===== Expect ====== @@ -91,6 +141,32 @@ where } } +// ===== ExpectErr ====== + +/// Panic on success +pub struct ExpectErr { + inner: T, + msg: String, +} + +impl Future for ExpectErr +where + T: Future, + T::Item: fmt::Debug, + T::Error: fmt::Debug, +{ + type Item = T::Error; + type Error = (); + + fn poll(&mut self) -> Poll { + let poll = + self.inner.poll() + .map_err(Async::Ready) + .expect_err(&self.msg); + Ok(poll) + } +} + // ===== Drive ====== /// Drive a future to completion while also polling the driver