Add test and assertion for idle state handling (#160)
This commit is contained in:
		
				
					committed by
					
						 Carl Lerche
						Carl Lerche
					
				
			
			
				
	
			
			
			
						parent
						
							b1d282799b
						
					
				
				
					commit
					8a1c4d3d52
				
			| @@ -1,7 +1,7 @@ | |||||||
| use super::*; | use super::*; | ||||||
| use super::store::Resolve; | use super::store::Resolve; | ||||||
|  |  | ||||||
| use frame::Reason; | use frame::{Reason, StreamId}; | ||||||
|  |  | ||||||
| use codec::UserError; | use codec::UserError; | ||||||
| use codec::UserError::*; | use codec::UserError::*; | ||||||
| @@ -11,9 +11,18 @@ use bytes::buf::Take; | |||||||
| use std::{cmp, fmt}; | use std::{cmp, fmt}; | ||||||
| use std::io; | use std::io; | ||||||
|  |  | ||||||
|  | /// # Warning | ||||||
|  | /// | ||||||
|  | /// Queued streans are ordered by stream ID, as we need to ensure that | ||||||
|  | /// lower-numbered streams are sent headers before higher-numbered ones. | ||||||
|  | /// This is because "idle" stream IDs – those which have been initiated but | ||||||
|  | /// have yet to receive frames – will be implicitly closed on receipt of a | ||||||
|  | /// frame on a higher stream ID. If these queues was not ordered by stream | ||||||
|  | /// IDs, some mechanism would be necessary to ensure that the lowest-numberedh] | ||||||
|  | /// idle stream is opened first. | ||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| pub(super) struct Prioritize { | pub(super) struct Prioritize { | ||||||
|     /// Queue of streams waiting for socket capacity to send a frame |     /// Queue of streams waiting for socket capacity to send a frame. | ||||||
|     pending_send: store::Queue<stream::NextSend>, |     pending_send: store::Queue<stream::NextSend>, | ||||||
|  |  | ||||||
|     /// Queue of streams waiting for window capacity to produce data. |     /// Queue of streams waiting for window capacity to produce data. | ||||||
| @@ -24,6 +33,9 @@ pub(super) struct Prioritize { | |||||||
|  |  | ||||||
|     /// Connection level flow control governing sent data |     /// Connection level flow control governing sent data | ||||||
|     flow: FlowControl, |     flow: FlowControl, | ||||||
|  |  | ||||||
|  |     /// Stream ID of the last stream opened. | ||||||
|  |     last_opened_id: StreamId, | ||||||
| } | } | ||||||
|  |  | ||||||
| pub(crate) struct Prioritized<B> { | pub(crate) struct Prioritized<B> { | ||||||
| @@ -55,6 +67,7 @@ impl Prioritize { | |||||||
|             pending_capacity: store::Queue::new(), |             pending_capacity: store::Queue::new(), | ||||||
|             pending_open: store::Queue::new(), |             pending_open: store::Queue::new(), | ||||||
|             flow: flow, |             flow: flow, | ||||||
|  |             last_opened_id: StreamId::ZERO | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -617,6 +630,11 @@ impl Prioritize { | |||||||
|  |  | ||||||
|                     trace!("pop_frame; frame={:?}", frame); |                     trace!("pop_frame; frame={:?}", frame); | ||||||
|  |  | ||||||
|  |                     if cfg!(debug_assertions) && stream.state.is_idle() { | ||||||
|  |                         debug_assert!(stream.id > self.last_opened_id); | ||||||
|  |                         self.last_opened_id = stream.id; | ||||||
|  |                     } | ||||||
|  |  | ||||||
|                     if !stream.pending_send.is_empty() || stream.state.is_canceled() { |                     if !stream.pending_send.is_empty() || stream.state.is_canceled() { | ||||||
|                         // TODO: Only requeue the sender IF it is ready to send |                         // TODO: Only requeue the sender IF it is ready to send | ||||||
|                         // the next frame. i.e. don't requeue it if the next |                         // the next frame. i.e. don't requeue it if the next | ||||||
| @@ -640,6 +658,7 @@ impl Prioritize { | |||||||
|         while counts.can_inc_num_send_streams() { |         while counts.can_inc_num_send_streams() { | ||||||
|             if let Some(mut stream) = self.pending_open.pop(store) { |             if let Some(mut stream) = self.pending_open.pop(store) { | ||||||
|                 trace!("schedule_pending_open; stream={:?}", stream.id); |                 trace!("schedule_pending_open; stream={:?}", stream.id); | ||||||
|  |  | ||||||
|                 counts.inc_num_send_streams(); |                 counts.inc_num_send_streams(); | ||||||
|                 self.pending_send.push(&mut stream); |                 self.pending_send.push(&mut stream); | ||||||
|                 if let Some(task) = stream.open_task.take() { |                 if let Some(task) = stream.open_task.take() { | ||||||
|   | |||||||
| @@ -331,6 +331,13 @@ impl State { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn is_idle(&self) -> bool { | ||||||
|  |         match self.inner { | ||||||
|  |             Idle => true, | ||||||
|  |             _ => false, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> { |     pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> { | ||||||
|         use std::io; |         use std::io; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -369,6 +369,60 @@ fn recv_goaway_finishes_processed_streams() { | |||||||
|     h2.join(srv).wait().expect("wait"); |     h2.join(srv).wait().expect("wait"); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn skipped_stream_ids_are_implicitly_closed() { | ||||||
|  |     let _ = ::env_logger::init(); | ||||||
|  |     let (io, srv) = mock::new(); | ||||||
|  |  | ||||||
|  |     let srv = srv | ||||||
|  |         .assert_client_handshake() | ||||||
|  |         .expect("handshake") | ||||||
|  |         .recv_settings() | ||||||
|  |         .recv_frame(frames::headers(5) | ||||||
|  |             .request("GET", "https://example.com/") | ||||||
|  |             .eos(), | ||||||
|  |         ) | ||||||
|  |         // send the response on a lower-numbered stream, which should be | ||||||
|  |         // implicitly closed. | ||||||
|  |         .send_frame(frames::headers(3).response(200)); | ||||||
|  |  | ||||||
|  |         let h2 = Client::builder() | ||||||
|  |             .initial_stream_id(5) | ||||||
|  |             .handshake::<_, Bytes>(io) | ||||||
|  |             .expect("handshake") | ||||||
|  |             .and_then(|(mut client, h2)| { | ||||||
|  |                 let request = Request::builder() | ||||||
|  |                     .method(Method::GET) | ||||||
|  |                     .uri("https://example.com/") | ||||||
|  |                     .body(()) | ||||||
|  |                     .unwrap(); | ||||||
|  |  | ||||||
|  |                 let req = client.send_request(request, true) | ||||||
|  |                     .unwrap() | ||||||
|  |                     .0.then(|res| { | ||||||
|  |                         let err = res.unwrap_err(); | ||||||
|  |                         assert_eq!( | ||||||
|  |                             err.to_string(), | ||||||
|  |                             "protocol error: unspecific protocol error detected"); | ||||||
|  |                         Ok::<(), ()>(()) | ||||||
|  |                     }); | ||||||
|  |                     // client should see a conn error | ||||||
|  |                     let conn = h2.then(|res| { | ||||||
|  |                         let err = res.unwrap_err(); | ||||||
|  |                         assert_eq!( | ||||||
|  |                             err.to_string(), | ||||||
|  |                             "protocol error: unspecific protocol error detected" | ||||||
|  |                         ); | ||||||
|  |                         Ok::<(), ()>(()) | ||||||
|  |                     }); | ||||||
|  |                     conn.unwrap().join(req) | ||||||
|  |             }); | ||||||
|  |  | ||||||
|  |  | ||||||
|  |         h2.join(srv).wait().expect("wait"); | ||||||
|  |  | ||||||
|  | } | ||||||
|  |  | ||||||
| /* | /* | ||||||
| #[test] | #[test] | ||||||
| fn send_data_after_headers_eos() { | fn send_data_after_headers_eos() { | ||||||
|   | |||||||
| @@ -15,6 +15,30 @@ pub trait FutureExt: Future { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// Panic on success, yielding the content of an `Err`. | ||||||
|  |     fn unwrap_err(self) -> UnwrapErr<Self> | ||||||
|  |     where | ||||||
|  |         Self: Sized, | ||||||
|  |         Self::Error: fmt::Debug, | ||||||
|  |     { | ||||||
|  |         UnwrapErr { | ||||||
|  |             inner: self, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /// Panic on success, with a message. | ||||||
|  |     fn expect_err<T>(self, msg: T) -> ExpectErr<Self> | ||||||
|  |     where | ||||||
|  |         Self: Sized, | ||||||
|  |         Self::Error: fmt::Debug, | ||||||
|  |         T: fmt::Display, | ||||||
|  |     { | ||||||
|  |         ExpectErr{ | ||||||
|  |             inner: self, | ||||||
|  |             msg: msg.to_string(), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /// Panic on error, with a message. |     /// Panic on error, with a message. | ||||||
|     fn expect<T>(self, msg: T) -> Expect<Self> |     fn expect<T>(self, msg: T) -> Expect<Self> | ||||||
|     where |     where | ||||||
| @@ -68,6 +92,32 @@ where | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // ===== UnwrapErr ====== | ||||||
|  |  | ||||||
|  | /// Panic on success. | ||||||
|  | pub struct UnwrapErr<T> { | ||||||
|  |     inner: T, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<T> Future for UnwrapErr<T> | ||||||
|  | where | ||||||
|  |     T: Future, | ||||||
|  |     T::Item: fmt::Debug, | ||||||
|  |     T::Error: fmt::Debug, | ||||||
|  | { | ||||||
|  |     type Item = T::Error; | ||||||
|  |     type Error = (); | ||||||
|  |  | ||||||
|  |     fn poll(&mut self) -> Poll<T::Error, ()> { | ||||||
|  |         let poll = | ||||||
|  |             self.inner.poll() | ||||||
|  |                 .map_err(Async::Ready) | ||||||
|  |                 .unwrap_err(); | ||||||
|  |         Ok(poll) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| // ===== Expect ====== | // ===== Expect ====== | ||||||
|  |  | ||||||
| @@ -91,6 +141,32 @@ where | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // ===== ExpectErr ====== | ||||||
|  |  | ||||||
|  | /// Panic on success | ||||||
|  | pub struct ExpectErr<T> { | ||||||
|  |     inner: T, | ||||||
|  |     msg: String, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<T> Future for ExpectErr<T> | ||||||
|  | where | ||||||
|  |     T: Future, | ||||||
|  |     T::Item: fmt::Debug, | ||||||
|  |     T::Error: fmt::Debug, | ||||||
|  | { | ||||||
|  |     type Item = T::Error; | ||||||
|  |     type Error = (); | ||||||
|  |  | ||||||
|  |     fn poll(&mut self) -> Poll<T::Error, ()> { | ||||||
|  |         let poll = | ||||||
|  |             self.inner.poll() | ||||||
|  |                 .map_err(Async::Ready) | ||||||
|  |                 .expect_err(&self.msg); | ||||||
|  |         Ok(poll) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| // ===== Drive ====== | // ===== Drive ====== | ||||||
|  |  | ||||||
| /// Drive a future to completion while also polling the driver | /// Drive a future to completion while also polling the driver | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user