From f8efb053b9acca77446d3f242784a0f240ebf5e0 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 28 Sep 2017 16:55:12 -0700 Subject: [PATCH] split Client into (Client, Connection) (#107) The Connection type is a `Future` that drives all of the IO of the client connection. The Client type is separate, and is used to send requests into the connection. --- examples/akamai.rs | 4 +- examples/client.rs | 6 +- src/client.rs | 120 +++++++++++++++++++---------- src/proto/connection.rs | 33 +------- src/proto/mod.rs | 2 +- src/proto/streams/counts.rs | 48 ++---------- src/proto/streams/mod.rs | 1 + src/proto/streams/prioritize.rs | 41 ++++++++-- src/proto/streams/send.rs | 22 +++--- src/proto/streams/state.rs | 5 +- src/proto/streams/store.rs | 73 +++++++++++++----- src/proto/streams/stream.rs | 43 +++++++++++ src/proto/streams/streams.rs | 78 ++++++++++++++++--- tests/client_request.rs | 131 +++++++++++++++++++++++++------- tests/flow_control.rs | 52 +++++++------ tests/ping_pong.rs | 4 +- tests/prioritization.rs | 16 ++-- tests/push_promise.rs | 9 ++- tests/stream_states.rs | 45 ++++++----- tests/support/mock.rs | 2 +- tests/support/mod.rs | 6 ++ tests/support/prelude.rs | 2 +- tests/trailers.rs | 8 +- 23 files changed, 489 insertions(+), 262 deletions(-) diff --git a/examples/akamai.rs b/examples/akamai.rs index 8897b29..fe1965a 100644 --- a/examples/akamai.rs +++ b/examples/akamai.rs @@ -67,7 +67,7 @@ pub fn main() { Client::handshake(tls) }) .then(|res| { - let mut h2 = res.unwrap(); + let (mut client, h2) = res.unwrap(); let request = Request::builder() .method(Method::GET) @@ -75,7 +75,7 @@ pub fn main() { .body(()) .unwrap(); - let stream = h2.send_request(request, true).unwrap(); + let stream = client.send_request(request, true).unwrap(); let stream = stream.and_then(|response| { let (_, body) = response.into_parts(); diff --git a/examples/client.rs b/examples/client.rs index 9e206c9..bb6f2aa 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -58,7 +58,7 @@ pub fn main() { let tcp = io_dump::Dump::to_stdout(res.unwrap()); Client::handshake(tcp) }).then(|res| { - let mut client = res.unwrap(); + let (mut client, h2) = res.unwrap(); println!("sending request"); @@ -75,8 +75,8 @@ pub fn main() { // send trailers stream.send_trailers(trailers).unwrap(); - // Spawn a task to run the client... - handle.spawn(client.map_err(|e| println!("GOT ERR={:?}", e))); + // Spawn a task to run the conn... + handle.spawn(h2.map_err(|e| println!("GOT ERR={:?}", e))); stream .and_then(|response| { diff --git a/src/client.rs b/src/client.rs index 23d6397..b1add55 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,7 @@ use codec::{Codec, RecvError}; use frame::{Headers, Pseudo, Settings, StreamId}; use frame::Reason::*; -use proto::{self, Connection, WindowSize}; +use proto::{self, WindowSize}; use bytes::{Bytes, IntoBuf}; use futures::{Async, Future, MapErr, Poll}; @@ -21,8 +21,13 @@ pub struct Handshake { } /// Marker type indicating a client peer -pub struct Client { - connection: Connection, +pub struct Client { + inner: proto::Streams, + pending: Option, +} + +pub struct Connection { + inner: proto::Connection, } #[derive(Debug)] @@ -45,10 +50,9 @@ pub struct Builder { #[derive(Debug)] pub(crate) struct Peer; -impl Client -where - T: AsyncRead + AsyncWrite, -{ +// ===== impl Client ===== + +impl Client { /// Bind an H2 client connection. /// /// Returns a future which resolves to the connection value once the H2 @@ -56,24 +60,29 @@ where /// /// It's important to note that this does not **flush** the outbound /// settings to the wire. - pub fn handshake(io: T) -> Handshake { + pub fn handshake(io: T) -> Handshake + where + T: AsyncRead + AsyncWrite, + { Builder::default().handshake(io) } } -impl Client<(), Bytes> { +impl Client { /// Creates a Client Builder to customize a Client before binding. pub fn builder() -> Builder { Builder::default() } } -impl Client +impl Client where - T: AsyncRead + AsyncWrite, B: IntoBuf, { - fn handshake2(io: T, builder: Builder) -> Handshake { + fn handshake2(io: T, builder: Builder) -> Handshake + where + T: AsyncRead + AsyncWrite, + { use tokio_io::io; debug!("binding client connection"); @@ -91,7 +100,9 @@ where /// Returns `Ready` when the connection can initialize a new HTTP 2.0 /// stream. pub fn poll_ready(&mut self) -> Poll<(), ::Error> { - self.connection.poll_send_request_ready() + try_ready!(self.inner.poll_pending_open(self.pending.as_ref())); + self.pending = None; + Ok(().into()) } /// Send a request on a new HTTP 2.0 stream @@ -100,10 +111,13 @@ where request: Request<()>, end_of_stream: bool, ) -> Result, ::Error> { - self.connection - .send_request(request, end_of_stream) + self.inner + .send_request(request, end_of_stream, self.pending.as_ref()) .map_err(Into::into) .map(|stream| { + if stream.is_pending_open() { + self.pending = Some(stream.key()); + } Stream { inner: stream, } @@ -111,37 +125,30 @@ where } } -impl Future for Client +impl fmt::Debug for Client where - T: AsyncRead + AsyncWrite, B: IntoBuf, { - type Item = (); - type Error = ::Error; - - fn poll(&mut self) -> Poll<(), ::Error> { - self.connection.poll().map_err(Into::into) + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Client").finish() } } -impl fmt::Debug for Client +impl Clone for Client where - T: AsyncRead + AsyncWrite, - T: fmt::Debug, - B: fmt::Debug + IntoBuf, - B::Buf: fmt::Debug, + B: IntoBuf, { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("Client") - .field("connection", &self.connection) - .finish() + fn clone(&self) -> Self { + Client { + inner: self.inner.clone(), + pending: None, + } } } #[cfg(feature = "unstable")] -impl Client +impl Client where - T: AsyncRead + AsyncWrite, B: IntoBuf, { /// Returns the number of active streams. @@ -149,7 +156,7 @@ where /// An active stream is a stream that has not yet transitioned to a closed /// state. pub fn num_active_streams(&self) -> usize { - self.connection.num_active_streams() + self.inner.num_active_streams() } /// Returns the number of streams that are held in memory. @@ -158,7 +165,7 @@ where /// stay in memory for some reason. For example, there are still outstanding /// userspace handles pointing to the slot. pub fn num_wired_streams(&self) -> usize { - self.connection.num_wired_streams() + self.inner.num_wired_streams() } } @@ -219,13 +226,40 @@ impl Default for Builder { } } +// ===== impl Connection ===== + +impl Future for Connection +where + T: AsyncRead + AsyncWrite, + B: IntoBuf, +{ + type Item = (); + type Error = ::Error; + + fn poll(&mut self) -> Poll<(), ::Error> { + self.inner.poll().map_err(Into::into) + } +} + +impl fmt::Debug for Connection +where + T: AsyncRead + AsyncWrite, + T: fmt::Debug, + B: fmt::Debug + IntoBuf, + B::Buf: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.inner, fmt) + } +} + // ===== impl Handshake ===== impl Future for Handshake where T: AsyncRead + AsyncWrite, { - type Item = Client; + type Item = (Client, Connection); type Error = ::Error; fn poll(&mut self) -> Poll { @@ -245,10 +279,16 @@ where .buffer(self.builder.settings.clone().into()) .expect("invalid SETTINGS frame"); - let connection = Connection::new(codec, &self.builder.settings, self.builder.stream_id); - Ok(Async::Ready(Client { - connection, - })) + let connection = + proto::Connection::new(codec, &self.builder.settings, self.builder.stream_id); + let client = Client { + inner: connection.streams().clone(), + pending: None, + }; + let conn = Connection { + inner: connection, + }; + Ok(Async::Ready((client, conn))) } } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index d4e4e94..c2b6fdf 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,5 +1,5 @@ use {client, frame, proto, server}; -use codec::{RecvError, SendError}; +use codec::RecvError; use frame::Reason; use frame::DEFAULT_INITIAL_WINDOW_SIZE; @@ -7,7 +7,6 @@ use proto::*; use bytes::{Bytes, IntoBuf}; use futures::Stream; -use http::Request; use tokio_io::{AsyncRead, AsyncWrite}; use std::marker::PhantomData; @@ -249,18 +248,8 @@ where T: AsyncRead + AsyncWrite, B: IntoBuf, { - /// Returns `Ready` when new the connection is able to support a new request stream. - pub fn poll_send_request_ready(&mut self) -> Poll<(), ::Error> { - self.streams.poll_send_request_ready() - } - - /// Initialize a new HTTP/2.0 stream and send the message. - pub fn send_request( - &mut self, - request: Request<()>, - end_of_stream: bool, - ) -> Result, SendError> { - self.streams.send_request(request, end_of_stream) + pub(crate) fn streams(&self) -> &Streams { + &self.streams } } @@ -273,19 +262,3 @@ where self.streams.next_incoming() } } - -#[cfg(feature = "unstable")] -impl Connection -where - T: AsyncRead + AsyncWrite, - P: Peer, - B: IntoBuf, -{ - pub fn num_active_streams(&self) -> usize { - self.streams.num_active_streams() - } - - pub fn num_wired_streams(&self) -> usize { - self.streams.num_wired_streams() - } -} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index a6cbf53..cebd303 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -8,7 +8,7 @@ mod streams; pub(crate) use self::connection::Connection; pub(crate) use self::error::Error; pub(crate) use self::peer::Peer; -pub(crate) use self::streams::{StreamRef, Streams}; +pub(crate) use self::streams::{Key as StreamKey, StreamRef, Streams}; use codec::Codec; diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index 168e4f2..cd7d5b9 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -1,5 +1,4 @@ use super::*; -use client; use std::marker::PhantomData; use std::usize; @@ -10,20 +9,17 @@ where P: Peer, { /// Maximum number of locally initiated streams - max_send_streams: Option, + max_send_streams: usize, /// Current number of remote initiated streams num_send_streams: usize, /// Maximum number of remote initiated streams - max_recv_streams: Option, + max_recv_streams: usize, /// Current number of locally initiated streams num_recv_streams: usize, - /// Task awaiting notification to open a new stream. - blocked_open: Option, - _p: PhantomData

, } @@ -34,22 +30,17 @@ where /// Create a new `Counts` using the provided configuration values. pub fn new(config: &Config) -> Self { Counts { - max_send_streams: config.local_max_initiated, + max_send_streams: config.local_max_initiated.unwrap_or(usize::MAX), num_send_streams: 0, - max_recv_streams: config.remote_max_initiated, + max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), num_recv_streams: 0, - blocked_open: None, _p: PhantomData, } } /// Returns true if the receive stream concurrency can be incremented pub fn can_inc_num_recv_streams(&self) -> bool { - if let Some(max) = self.max_recv_streams { - max > self.num_recv_streams - } else { - true - } + self.max_recv_streams > self.num_recv_streams } /// Increments the number of concurrent receive streams. @@ -66,11 +57,7 @@ where /// Returns true if the send stream concurrency can be incremented pub fn can_inc_num_send_streams(&self) -> bool { - if let Some(max) = self.max_send_streams { - max > self.num_send_streams - } else { - true - } + self.max_send_streams > self.num_send_streams } /// Increments the number of concurrent send streams. @@ -87,7 +74,7 @@ where pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { if let Some(val) = settings.max_concurrent_streams() { - self.max_send_streams = Some(val as usize); + self.max_send_streams = val as usize; } } @@ -99,7 +86,7 @@ where where F: FnOnce(&mut Self, &mut store::Ptr) -> U, { - let is_counted = stream.state.is_counted(); + let is_counted = stream.is_counted(); // Run the action let ret = f(self, &mut stream); @@ -127,29 +114,10 @@ where } fn dec_num_streams(&mut self, id: StreamId) { - use std::usize; - if P::is_local_init(id) { self.num_send_streams -= 1; - - if self.num_send_streams < self.max_send_streams.unwrap_or(usize::MAX) { - if let Some(task) = self.blocked_open.take() { - task.notify(); - } - } } else { self.num_recv_streams -= 1; } } } - -impl Counts { - pub fn poll_open_ready(&mut self) -> Async<()> { - if !self.can_inc_num_send_streams() { - self.blocked_open = Some(task::current()); - return Async::NotReady; - } - - return Async::Ready(()); - } -} diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index f5b6f25..7b98f42 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -10,6 +10,7 @@ mod stream; mod streams; pub(crate) use self::prioritize::Prioritized; +pub(crate) use self::store::Key; pub(crate) use self::streams::{StreamRef, Streams}; use self::buffer::Buffer; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index b0f2f1d..b02988f 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -22,6 +22,9 @@ where /// Queue of streams waiting for window capacity to produce data. pending_capacity: store::Queue, + /// Streams waiting for capacity due to max concurrency + pending_open: store::Queue, + /// Connection level flow control governing sent data flow: FlowControl, @@ -60,6 +63,7 @@ where Prioritize { pending_send: store::Queue::new(), pending_capacity: store::Queue::new(), + pending_open: store::Queue::new(), flow: flow, buffer: Buffer::new(), } @@ -75,15 +79,22 @@ where // Queue the frame in the buffer stream.pending_send.push_back(&mut self.buffer, frame); - // Queue the stream - self.pending_send.push(stream); + // If the stream is waiting to be opened, nothing more to do. + if !stream.is_pending_open { + // Queue the stream + self.pending_send.push(stream); - // Notify the connection. - if let Some(task) = task.take() { - task.notify(); + // Notify the connection. + if let Some(task) = task.take() { + task.notify(); + } } } + pub fn queue_open(&mut self, stream: &mut store::Ptr) { + self.pending_open.push(stream); + } + /// Send a data frame pub fn send_data( &mut self, @@ -371,6 +382,7 @@ where trace!("poll_complete"); loop { + self.schedule_pending_open(store, counts); match self.pop_frame(store, max_frame_len, counts) { Some(frame) => { trace!("writing frame={:?}", frame); @@ -482,7 +494,7 @@ where trace!("pop_frame; stream={:?}", stream.id); debug_assert!(!stream.pending_send.is_empty()); - let is_counted = stream.state.is_counted(); + let is_counted = stream.is_counted(); let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() { Frame::Data(mut frame) => { @@ -594,6 +606,23 @@ where } } } + + fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts

) { + trace!("schedule_pending_open"); + // check for any pending open streams + while counts.can_inc_num_send_streams() { + if let Some(mut stream) = self.pending_open.pop(store) { + trace!("schedule_pending_open; stream={:?}", stream.id); + counts.inc_num_send_streams(); + self.pending_send.push(&mut stream); + if let Some(task) = stream.open_task.take() { + task.notify(); + } + } else { + return; + } + } + } } // ===== impl Prioritized ===== diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 114c43d..6e24912 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -43,20 +43,9 @@ where self.init_window_sz } - /// Update state reflecting a new, locally opened stream - /// - /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self, counts: &mut Counts

) -> Result { - if !counts.can_inc_num_send_streams() { - return Err(Rejected.into()); - } - + pub fn open(&mut self) -> Result { let stream_id = self.try_open()?; - - // Increment the number of locally initiated streams - counts.inc_num_send_streams(); self.next_stream_id = stream_id.next_id(); - Ok(stream_id) } @@ -64,6 +53,7 @@ where &mut self, frame: frame::Headers, stream: &mut store::Ptr, + counts: &mut Counts

, task: &mut Option, ) -> Result<(), UserError> { trace!( @@ -77,6 +67,14 @@ where // Update the state stream.state.send_open(end_stream)?; + if P::is_local_init(frame.stream_id()) { + if counts.can_inc_num_send_streams() { + counts.inc_num_send_streams(); + } else { + self.prioritize.queue_open(stream); + } + } + // Queue the frame for sending self.prioritize.queue_frame(frame.into(), stream, task); diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 4588677..8687704 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -251,9 +251,8 @@ impl State { } } - /// Returns true if a stream with the current state counts against the - /// concurrency limit. - pub fn is_counted(&self) -> bool { + /// Returns true if a stream is open or half-closed. + pub fn is_at_least_half_open(&self) -> bool { match self.inner { Open { .. diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 75a3bdf..4f674d0 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -13,8 +13,9 @@ pub(super) struct Store where P: Peer, { - slab: slab::Slab>, - ids: OrderMap, + slab: slab::Slab<(StoreId, Stream)>, + ids: OrderMap, + counter: StoreId, } /// "Pointer" to an entry in the store @@ -28,7 +29,12 @@ where /// References an entry in the store. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(super) struct Key(usize); +pub(crate) struct Key { + index: usize, + store_id: StoreId, +} + +type StoreId = usize; #[derive(Debug)] pub(super) struct Queue @@ -64,15 +70,16 @@ pub(super) enum Entry<'a, B: 'a, P: Peer + 'a> { } pub(super) struct OccupiedEntry<'a> { - ids: ordermap::OccupiedEntry<'a, StreamId, usize>, + ids: ordermap::OccupiedEntry<'a, StreamId, (usize, StoreId)>, } pub(super) struct VacantEntry<'a, B: 'a, P> where P: Peer + 'a, { - ids: ordermap::VacantEntry<'a, StreamId, usize>, - slab: &'a mut slab::Slab>, + ids: ordermap::VacantEntry<'a, StreamId, (usize, StoreId)>, + slab: &'a mut slab::Slab<(StoreId, Stream)>, + counter: &'a mut usize, } pub(super) trait Resolve @@ -92,6 +99,7 @@ where Store { slab: slab::Slab::new(), ids: OrderMap::new(), + counter: 0, } } @@ -106,17 +114,25 @@ where }; Some(Ptr { - key: Key(key), + key: Key { + index: key.0, + store_id: key.1, + }, store: self, }) } pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr { - let key = self.slab.insert(val); - assert!(self.ids.insert(id, key).is_none()); + let store_id = self.counter; + self.counter = self.counter.wrapping_add(1); + let key = self.slab.insert((store_id, val)); + assert!(self.ids.insert(id, (key, store_id)).is_none()); Ptr { - key: Key(key), + key: Key { + index: key, + store_id, + }, store: self, } } @@ -131,6 +147,7 @@ where Vacant(e) => Entry::Vacant(VacantEntry { ids: e, slab: &mut self.slab, + counter: &mut self.counter, }), } } @@ -147,7 +164,10 @@ where let key = *self.ids.get_index(i).unwrap().1; f(Ptr { - key: Key(key), + key: Key { + index: key.0, + store_id: key.1, + }, store: self, })?; @@ -185,7 +205,9 @@ where type Output = Stream; fn index(&self, key: Key) -> &Self::Output { - self.slab.index(key.0) + let slot = self.slab.index(key.index); + assert_eq!(slot.0, key.store_id); + &slot.1 } } @@ -194,7 +216,9 @@ where P: Peer, { fn index_mut(&mut self, key: Key) -> &mut Self::Output { - self.slab.index_mut(key.0) + let slot = self.slab.index_mut(key.index); + assert_eq!(slot.0, key.store_id); + &mut slot.1 } } @@ -319,7 +343,7 @@ where debug_assert!(!self.store.ids.contains_key(&self.id)); // Remove the stream state - self.store.slab.remove(self.key.0).id + self.store.slab.remove(self.key.index).1.id } /// Remove the StreamId -> stream state association. @@ -351,7 +375,7 @@ where type Target = Stream; fn deref(&self) -> &Stream { - &self.store.slab[self.key.0] + &self.store.slab[self.key.index].1 } } @@ -360,7 +384,7 @@ where P: Peer, { fn deref_mut(&mut self) -> &mut Stream { - &mut self.store.slab[self.key.0] + &mut self.store.slab[self.key.index].1 } } @@ -368,7 +392,11 @@ where impl<'a> OccupiedEntry<'a> { pub fn key(&self) -> Key { - Key(*self.ids.get()) + let tup = self.ids.get(); + Key { + index: tup.0, + store_id: tup.1, + } } } @@ -380,11 +408,16 @@ where { pub fn insert(self, value: Stream) -> Key { // Insert the value in the slab - let key = self.slab.insert(value); + let store_id = *self.counter; + *self.counter = store_id.wrapping_add(1); + let index = self.slab.insert((store_id, value)); // Insert the handle in the ID map - self.ids.insert(key); + self.ids.insert((index, store_id)); - Key(key) + Key { + index, + store_id, + } } } diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 334f3f3..01c3599 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -60,6 +60,15 @@ where /// Set to true when the send capacity has been incremented pub send_capacity_inc: bool, + /// Next node in the open linked list + pub next_open: Option, + + /// Set to true when the stream is pending to be opened + pub is_pending_open: bool, + + /// Task tracking when stream can be "opened", or initially sent to socket. + pub open_task: Option, + // ===== Fields related to receiving ===== /// Next node in the accept linked list pub next_pending_accept: Option, @@ -111,6 +120,9 @@ pub(super) struct NextSendCapacity; #[derive(Debug)] pub(super) struct NextWindowUpdate; +#[derive(Debug)] +pub(super) struct NextOpen; + impl Stream where P: Peer, @@ -150,6 +162,9 @@ where is_pending_send_capacity: false, next_pending_send_capacity: None, send_capacity_inc: false, + is_pending_open: false, + next_open: None, + open_task: None, // ===== Fields related to receiving ===== next_pending_accept: None, @@ -177,6 +192,12 @@ where self.ref_count -= 1; } + /// Returns true if a stream with the current state counts against the + /// concurrency limit. + pub fn is_counted(&self) -> bool { + !self.is_pending_open && self.state.is_at_least_half_open() + } + /// Returns true if the stream is closed pub fn is_closed(&self) -> bool { // The state has fully transitioned to closed. @@ -337,6 +358,28 @@ impl store::Next for NextWindowUpdate { } } +impl store::Next for NextOpen { + fn next(stream: &Stream) -> Option { + stream.next_open + } + + fn set_next(stream: &mut Stream, key: Option) { + stream.next_open = key; + } + + fn take_next(stream: &mut Stream) -> Option { + stream.next_open.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_open + } + + fn set_queued(stream: &mut Stream, val: bool) { + stream.is_pending_open = val; + } +} + // ===== impl ContentLength ===== impl ContentLength { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index d764f23..aa62927 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -323,6 +323,7 @@ where &mut self, request: Request<()>, end_of_stream: bool, + pending: Option<&store::Key>, ) -> Result, SendError> { use super::stream::ContentLength; use http::Method; @@ -336,8 +337,21 @@ where let mut me = self.inner.lock().unwrap(); let me = &mut *me; - // Initialize a new stream. This fails if the connection is at capacity. - let stream_id = me.actions.send.open(&mut me.counts)?; + me.actions.send.ensure_next_stream_id()?; + + // The `pending` argument is provided by the `Client`, and holds + // a store `Key` of a `Stream` that may have been not been opened + // yet. + // + // If that stream is still pending, the Client isn't allowed to + // queue up another pending stream. They should use `poll_ready`. + if let Some(key) = pending { + if me.store.resolve(*key).is_pending_open { + return Err(UserError::Rejected.into()); + } + } + + let stream_id = me.actions.send.open()?; let mut stream = Stream::new( stream_id, @@ -354,9 +368,12 @@ where let mut stream = me.store.insert(stream.id, stream); - me.actions - .send - .send_headers(headers, &mut stream, &mut me.actions.task)?; + me.actions.send.send_headers( + headers, + &mut stream, + &mut me.counts, + &mut me.actions.task, + )?; // Given that the stream has been initialized, it should not be in the // closed state. @@ -403,13 +420,21 @@ impl Streams where B: Buf, { - pub fn poll_send_request_ready(&mut self) -> Poll<(), ::Error> { + pub fn poll_pending_open(&mut self, key: Option<&store::Key>) -> Poll<(), ::Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; me.actions.send.ensure_next_stream_id()?; - Ok(me.counts.poll_open_ready()) + if let Some(key) = key { + let mut stream = me.store.resolve(*key); + trace!("poll_pending_open; stream = {:?}", stream.is_pending_open); + if stream.is_pending_open { + stream.send_task = Some(task::current()); + return Ok(Async::NotReady); + } + } + Ok(().into()) } } @@ -430,6 +455,19 @@ where } } +// no derive because we don't need B and P to be Clone. +impl Clone for Streams +where + B: Buf, + P: Peer, +{ + fn clone(&self) -> Self { + Streams { + inner: self.inner.clone(), + } + } +} + // ===== impl StreamRef ===== impl StreamRef @@ -493,10 +531,12 @@ where let stream = me.store.resolve(self.key); let actions = &mut me.actions; - me.counts.transition(stream, |_, stream| { + me.counts.transition(stream, |counts, stream| { let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream); - actions.send.send_headers(frame, stream, &mut actions.task) + actions + .send + .send_headers(frame, stream, counts, &mut actions.task) }) } @@ -569,6 +609,10 @@ where me.actions.send.poll_capacity(&mut stream) } + + pub(crate) fn key(&self) -> store::Key { + self.key + } } impl StreamRef @@ -603,6 +647,12 @@ where me.actions.recv.poll_response(&mut stream) } + + + pub fn is_pending_open(&self) -> bool { + let mut me = self.inner.lock().unwrap(); + me.store.resolve(self.key).is_pending_open + } } impl Clone for StreamRef @@ -625,7 +675,15 @@ where P: Peer, { fn drop(&mut self) { - let mut me = self.inner.lock().unwrap(); + let mut me = match self.inner.lock() { + Ok(inner) => inner, + Err(_) => if ::std::thread::panicking() { + trace!("StreamRef::drop; mutex poisoned"); + return; + } else { + panic!("StreamRef::drop; mutex poisoned"); + }, + }; let me = &mut *me; diff --git a/tests/client_request.rs b/tests/client_request.rs index 2f3bffe..8552175 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -13,7 +13,7 @@ fn handshake() { .write(SETTINGS_ACK) .build(); - let h2 = Client::handshake(mock).wait().unwrap(); + let (_, h2) = Client::handshake(mock).wait().unwrap(); trace!("hands have been shook"); @@ -21,6 +21,43 @@ fn handshake() { h2.wait().unwrap(); } +#[test] +fn client_other_thread() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .send_frame(frames::headers(1).response(200).eos()) + .close(); + + let h2 = Client::handshake(io) + .expect("handshake") + .and_then(|(mut client, h2)| { + ::std::thread::spawn(move || { + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + let res = client + .send_request(request, true) + .unwrap() + .wait() + .expect("request"); + assert_eq!(res.status(), StatusCode::OK); + }); + + h2.expect("h2") + }); + h2.join(srv).wait().expect("wait"); +} + #[test] fn recv_invalid_server_stream_id() { let _ = ::env_logger::init(); @@ -39,7 +76,7 @@ fn recv_invalid_server_stream_id() { .write(&[0, 0, 8, 7, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]) .build(); - let mut h2 = Client::handshake(mock).wait().unwrap(); + let (mut client, h2) = Client::handshake(mock).wait().unwrap(); // Send the request let request = Request::builder() @@ -48,7 +85,7 @@ fn recv_invalid_server_stream_id() { .unwrap(); info!("sending request"); - let stream = h2.send_request(request, true).unwrap(); + let stream = client.send_request(request, true).unwrap(); // The connection errors assert!(h2.wait().is_err()); @@ -67,7 +104,7 @@ fn request_stream_id_overflows() { .initial_stream_id(::std::u32::MAX >> 1) .handshake::<_, Bytes>(io) .expect("handshake") - .and_then(|mut h2| { + .and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::GET) .uri("https://example.com/") @@ -75,24 +112,26 @@ fn request_stream_id_overflows() { .unwrap(); // first request is allowed - let req = h2.send_request(request, true).unwrap().unwrap(); + let req = client.send_request(request, true).unwrap().unwrap(); - let request = Request::builder() - .method(Method::GET) - .uri("https://example.com/") - .body(()) - .unwrap(); + h2.drive(req).and_then(move |(h2, _)| { + let request = Request::builder() + .method(Method::GET) + .uri("https://example.com/") + .body(()) + .unwrap(); - // second cannot use the next stream id, it's over + // second cannot use the next stream id, it's over - let poll_err = h2.poll_ready().unwrap_err(); - assert_eq!(poll_err.to_string(), "user error: stream ID overflowed"); + let poll_err = client.poll_ready().unwrap_err(); + assert_eq!(poll_err.to_string(), "user error: stream ID overflowed"); - let err = h2.send_request(request, true).unwrap_err(); - assert_eq!(err.to_string(), "user error: stream ID overflowed"); + let err = client.send_request(request, true).unwrap_err(); + assert_eq!(err.to_string(), "user error: stream ID overflowed"); - h2.expect("h2").join(req) + h2.expect("h2") + }) }); let srv = srv.assert_client_handshake() @@ -120,13 +159,27 @@ fn request_over_max_concurrent_streams_errors() { .max_concurrent_streams(1)) .unwrap() .recv_settings() - .recv_frame(frames::headers(1).request("POST", "https://example.com/")) - .send_frame(frames::headers(1).response(200)) + .recv_frame( + frames::headers(1) + .request("POST", "https://example.com/") + .eos(), + ) + .send_frame(frames::headers(1).response(200).eos()) + .recv_frame(frames::headers(3).request("POST", "https://example.com/")) + .send_frame(frames::headers(3).response(200)) + .recv_frame(frames::data(3, "hello").eos()) + .send_frame(frames::data(3, "").eos()) + .recv_frame(frames::headers(5).request("POST", "https://example.com/")) + .send_frame(frames::headers(5).response(200)) + .recv_frame(frames::data(5, "hello").eos()) + .send_frame(frames::data(5, "").eos()) .close(); let h2 = Client::handshake(io) .expect("handshake") - .and_then(|mut h2| { + .and_then(|(mut client, h2)| { + // we send a simple req here just to drive the connection so we can + // receive the server settings. let request = Request::builder() .method(Method::POST) .uri("https://example.com/") @@ -134,28 +187,48 @@ fn request_over_max_concurrent_streams_errors() { .unwrap(); // first request is allowed - let req = h2.send_request(request, false).unwrap().unwrap(); - - // drive the connection some so we can receive the server settings - h2.drive(req) + let req = client.send_request(request, true).unwrap().unwrap(); + h2.drive(req).map(move |(h2, _)| (client, h2)) }) - .and_then(|(mut h2, _)| { + .and_then(|(mut client, h2)| { + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + + // first request is allowed + let mut req = client.send_request(request, false).unwrap(); + + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + + // second request is put into pending_open + let mut req2 = client.send_request(request, false).unwrap(); + let request = Request::builder() .method(Method::GET) .uri("https://example.com/") .body(()) .unwrap(); - // second stream is over max concurrent - assert!(h2.poll_ready().expect("poll_ready").is_not_ready()); + // third stream is over max concurrent + assert!(client.poll_ready().expect("poll_ready").is_not_ready()); - let err = h2.send_request(request, true).unwrap_err(); + let err = client.send_request(request, true).unwrap_err(); assert_eq!(err.to_string(), "user error: rejected"); - h2.expect("h2") + req.send_data("hello".into(), true).expect("req send_data"); + h2.drive(req.expect("req")).and_then(move |(h2, _)| { + req2.send_data("hello".into(), true) + .expect("req2 send_data"); + h2.expect("h2").join(req2.expect("req2")) + }) }); - h2.join(srv).wait().expect("wait"); } diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 6c7ec83..f25b920 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -27,7 +27,7 @@ fn send_data_without_requesting_capacity() { .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) .build(); - let mut h2 = Client::handshake(mock).wait().unwrap(); + let (mut client, mut h2) = Client::handshake(mock).wait().unwrap(); let request = Request::builder() .method(Method::POST) @@ -35,7 +35,7 @@ fn send_data_without_requesting_capacity() { .body(()) .unwrap(); - let mut stream = h2.send_request(request, false).unwrap(); + let mut stream = client.send_request(request, false).unwrap(); // The capacity should be immediately allocated assert_eq!(stream.capacity(), 0); @@ -82,14 +82,14 @@ fn release_capacity_sends_window_update() { // gotta end the connection .map(drop); - let h2 = Client::handshake(io).unwrap().and_then(|mut h2| { + let h2 = Client::handshake(io).unwrap().and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::GET) .uri("https://http2.akamai.com/") .body(()) .unwrap(); - let req = h2.send_request(request, true).unwrap() + let req = client.send_request(request, true).unwrap() .unwrap() // Get the response .and_then(|resp| { @@ -145,14 +145,14 @@ fn release_capacity_of_small_amount_does_not_send_window_update() { // gotta end the connection .map(drop); - let h2 = Client::handshake(io).unwrap().and_then(|mut h2| { + let h2 = Client::handshake(io).unwrap().and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::GET) .uri("https://http2.akamai.com/") .body(()) .unwrap(); - let req = h2.send_request(request, true).unwrap() + let req = client.send_request(request, true).unwrap() .unwrap() // Get the response .and_then(|resp| { @@ -212,14 +212,15 @@ fn recv_data_overflows_connection_window() { .recv_frame(frames::go_away(0).flow_control()); // connection is ended by client - let h2 = Client::handshake(io).unwrap().and_then(|mut h2| { + let h2 = Client::handshake(io).unwrap().and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::GET) .uri("https://http2.akamai.com/") .body(()) .unwrap(); - let req = h2.send_request(request, true) + let req = client + .send_request(request, true) .unwrap() .unwrap() .and_then(|resp| { @@ -281,14 +282,15 @@ fn recv_data_overflows_stream_window() { .initial_window_size(16_384) .handshake::<_, Bytes>(io) .unwrap() - .and_then(|mut h2| { + .and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::GET) .uri("https://http2.akamai.com/") .body(()) .unwrap(); - let req = h2.send_request(request, true) + let req = client + .send_request(request, true) .unwrap() .unwrap() .and_then(|resp| { @@ -333,7 +335,7 @@ fn stream_close_by_data_frame_releases_capacity() { let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE as usize; - let h2 = Client::handshake(io).unwrap().and_then(|mut h2| { + let h2 = Client::handshake(io).unwrap().and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::POST) .uri("https://http2.akamai.com/") @@ -341,7 +343,7 @@ fn stream_close_by_data_frame_releases_capacity() { .unwrap(); // Send request - let mut s1 = h2.send_request(request, false).unwrap(); + let mut s1 = client.send_request(request, false).unwrap(); // This effectively reserves the entire connection window s1.reserve_capacity(window_size); @@ -357,7 +359,7 @@ fn stream_close_by_data_frame_releases_capacity() { .unwrap(); // Create a second stream - let mut s2 = h2.send_request(request, false).unwrap(); + let mut s2 = client.send_request(request, false).unwrap(); // Request capacity s2.reserve_capacity(5); @@ -401,7 +403,7 @@ fn stream_close_by_trailers_frame_releases_capacity() { let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE as usize; - let h2 = Client::handshake(io).unwrap().and_then(|mut h2| { + let h2 = Client::handshake(io).unwrap().and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::POST) .uri("https://http2.akamai.com/") @@ -409,7 +411,7 @@ fn stream_close_by_trailers_frame_releases_capacity() { .unwrap(); // Send request - let mut s1 = h2.send_request(request, false).unwrap(); + let mut s1 = client.send_request(request, false).unwrap(); // This effectively reserves the entire connection window s1.reserve_capacity(window_size); @@ -425,7 +427,7 @@ fn stream_close_by_trailers_frame_releases_capacity() { .unwrap(); // Create a second stream - let mut s2 = h2.send_request(request, false).unwrap(); + let mut s2 = client.send_request(request, false).unwrap(); // Request capacity s2.reserve_capacity(5); @@ -504,14 +506,14 @@ fn recv_window_update_on_stream_closed_by_data_frame() { let h2 = Client::handshake(io) .unwrap() - .and_then(|mut h2| { + .and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::POST) .uri("https://http2.akamai.com/") .body(()) .unwrap(); - let stream = h2.send_request(request, false).unwrap(); + let stream = client.send_request(request, false).unwrap(); // Wait for the response h2.drive(GetResponse { @@ -547,14 +549,14 @@ fn reserved_capacity_assigned_in_multi_window_updates() { let h2 = Client::handshake(io) .unwrap() - .and_then(|mut h2| { + .and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::POST) .uri("https://http2.akamai.com/") .body(()) .unwrap(); - let mut stream = h2.send_request(request, false).unwrap(); + let mut stream = client.send_request(request, false).unwrap(); // Consume the capacity let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize]; @@ -674,17 +676,19 @@ fn connection_notified_on_released_capacity() { let th2 = thread::spawn(move || { - let h2 = Client::handshake(io).wait().unwrap(); + let (mut client, h2) = Client::handshake(io).wait().unwrap(); - let (mut h2, _) = h2.drive(settings_rx).wait().unwrap(); + let (h2, _) = h2.drive(settings_rx).wait().unwrap(); let request = Request::get("https://example.com/a").body(()).unwrap(); - tx.send(h2.send_request(request, true).unwrap()).unwrap(); + tx.send(client.send_request(request, true).unwrap()) + .unwrap(); let request = Request::get("https://example.com/b").body(()).unwrap(); - tx.send(h2.send_request(request, true).unwrap()).unwrap(); + tx.send(client.send_request(request, true).unwrap()) + .unwrap(); // Run the connection to completion h2.wait().unwrap(); diff --git a/tests/ping_pong.rs b/tests/ping_pong.rs index 9436f6c..cf4869f 100644 --- a/tests/ping_pong.rs +++ b/tests/ping_pong.rs @@ -8,7 +8,9 @@ fn recv_single_ping() { let (m, mock) = mock::new(); // Create the handshake - let h2 = Client::handshake(m).unwrap().and_then(|conn| conn.unwrap()); + let h2 = Client::handshake(m) + .unwrap() + .and_then(|(_, conn)| conn.unwrap()); let mock = mock.assert_client_handshake() .unwrap() diff --git a/tests/prioritization.rs b/tests/prioritization.rs index 761484f..b3a6c11 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -25,7 +25,7 @@ fn single_stream_send_large_body() { .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) .build(); - let mut h2 = Client::handshake(mock).wait().unwrap(); + let (mut client, mut h2) = Client::handshake(mock).wait().unwrap(); let request = Request::builder() .method(Method::POST) @@ -33,7 +33,7 @@ fn single_stream_send_large_body() { .body(()) .unwrap(); - let mut stream = h2.send_request(request, false).unwrap(); + let mut stream = client.send_request(request, false).unwrap(); // Reserve capacity to send the payload stream.reserve_capacity(payload.len()); @@ -79,7 +79,7 @@ fn single_stream_send_extra_large_body_multi_frames_one_buffer() { .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) .build(); - let mut h2 = Client::handshake(mock).wait().unwrap(); + let (mut client, mut h2) = Client::handshake(mock).wait().unwrap(); let request = Request::builder() .method(Method::POST) @@ -87,7 +87,7 @@ fn single_stream_send_extra_large_body_multi_frames_one_buffer() { .body(()) .unwrap(); - let mut stream = h2.send_request(request, false).unwrap(); + let mut stream = client.send_request(request, false).unwrap(); stream.reserve_capacity(payload.len()); @@ -144,7 +144,7 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) .build(); - let mut h2 = Client::handshake(mock).wait().unwrap(); + let (mut client, mut h2) = Client::handshake(mock).wait().unwrap(); let request = Request::builder() .method(Method::POST) @@ -152,7 +152,7 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { .body(()) .unwrap(); - let mut stream = h2.send_request(request, false).unwrap(); + let mut stream = client.send_request(request, false).unwrap(); stream.reserve_capacity(payload.len()); @@ -177,7 +177,7 @@ fn send_data_receive_window_update() { let h2 = Client::handshake(m) .unwrap() - .and_then(|mut h2| { + .and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::POST) .uri("https://http2.akamai.com/") @@ -185,7 +185,7 @@ fn send_data_receive_window_update() { .unwrap(); // Send request - let mut stream = h2.send_request(request, false).unwrap(); + let mut stream = client.send_request(request, false).unwrap(); // Send data frame stream.send_data("hello".into(), false).unwrap(); diff --git a/tests/push_promise.rs b/tests/push_promise.rs index 2d6d1d5..f8bba00 100644 --- a/tests/push_promise.rs +++ b/tests/push_promise.rs @@ -20,13 +20,14 @@ fn recv_push_works() { .send_frame(frames::headers(1).response(200).eos()) .send_frame(frames::headers(2).response(200).eos()); - let h2 = Client::handshake(io).unwrap().and_then(|mut h2| { + let h2 = Client::handshake(io).unwrap().and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::GET) .uri("https://http2.akamai.com/") .body(()) .unwrap(); - let req = h2.send_request(request, true) + let req = client + .send_request(request, true) .unwrap() .unwrap() .and_then(|resp| { @@ -61,13 +62,13 @@ fn recv_push_when_push_disabled_is_conn_error() { .enable_push(false) .handshake::<_, Bytes>(io) .unwrap() - .and_then(|mut h2| { + .and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::GET) .uri("https://http2.akamai.com/") .body(()) .unwrap(); - let req = h2.send_request(request, true).unwrap().then(|res| { + let req = client.send_request(request, true).unwrap().then(|res| { let err = res.unwrap_err(); assert_eq!( err.to_string(), diff --git a/tests/stream_states.rs b/tests/stream_states.rs index e62bfcd..30f31e1 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -20,7 +20,7 @@ fn send_recv_headers_only() { .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) .build(); - let mut h2 = Client::handshake(mock).wait().unwrap(); + let (mut client, mut h2) = Client::handshake(mock).wait().unwrap(); // Send the request let request = Request::builder() @@ -29,7 +29,7 @@ fn send_recv_headers_only() { .unwrap(); info!("sending request"); - let mut stream = h2.send_request(request, true).unwrap(); + let mut stream = client.send_request(request, true).unwrap(); let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); assert_eq!(resp.status(), StatusCode::NO_CONTENT); @@ -62,7 +62,7 @@ fn send_recv_data() { ]) .build(); - let mut h2 = Client::builder().handshake(mock).wait().unwrap(); + let (mut client, mut h2) = Client::builder().handshake(mock).wait().unwrap(); let request = Request::builder() .method(Method::POST) @@ -71,7 +71,7 @@ fn send_recv_data() { .unwrap(); info!("sending request"); - let mut stream = h2.send_request(request, false).unwrap(); + let mut stream = client.send_request(request, false).unwrap(); // Reserve send capacity stream.reserve_capacity(5); @@ -119,7 +119,7 @@ fn send_headers_recv_data_single_frame() { ]) .build(); - let mut h2 = Client::handshake(mock).wait().unwrap(); + let (mut client, mut h2) = Client::handshake(mock).wait().unwrap(); // Send the request let request = Request::builder() @@ -128,7 +128,7 @@ fn send_headers_recv_data_single_frame() { .unwrap(); info!("sending request"); - let mut stream = h2.send_request(request, true).unwrap(); + let mut stream = client.send_request(request, true).unwrap(); let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); assert_eq!(resp.status(), StatusCode::OK); @@ -154,32 +154,29 @@ fn closed_streams_are_released() { let _ = ::env_logger::init(); let (io, srv) = mock::new(); - let h2 = Client::handshake(io) - .unwrap() - .and_then(|mut h2| { - let request = Request::get("https://example.com/").body(()).unwrap(); + let h2 = Client::handshake(io).unwrap().and_then(|(mut client, h2)| { + let request = Request::get("https://example.com/").body(()).unwrap(); - // Send request - let stream = h2.send_request(request, true).unwrap(); - h2.drive(stream) - }) - .and_then(|(h2, response)| { + // Send request + let stream = client.send_request(request, true).unwrap(); + h2.drive(stream).and_then(move |(_, response)| { assert_eq!(response.status(), StatusCode::NO_CONTENT); // There are no active streams - assert_eq!(0, h2.num_active_streams()); + assert_eq!(0, client.num_active_streams()); // The response contains a handle for the body. This keeps the // stream wired. - assert_eq!(1, h2.num_wired_streams()); + assert_eq!(1, client.num_wired_streams()); drop(response); // The stream state is now free - assert_eq!(0, h2.num_wired_streams()); + assert_eq!(0, client.num_wired_streams()); Ok(()) - }); + }) + }); let srv = srv.assert_client_handshake() .unwrap() @@ -200,14 +197,15 @@ fn errors_if_recv_frame_exceeds_max_frame_size() { let _ = ::env_logger::init(); let (io, mut srv) = mock::new(); - let h2 = Client::handshake(io).unwrap().and_then(|mut h2| { + let h2 = Client::handshake(io).unwrap().and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::GET) .uri("https://example.com/") .body(()) .unwrap(); - let req = h2.send_request(request, true) + let req = client + .send_request(request, true) .unwrap() .unwrap() .and_then(|resp| { @@ -258,14 +256,15 @@ fn configure_max_frame_size() { .max_frame_size(16_384 * 2) .handshake::<_, Bytes>(io) .expect("handshake") - .and_then(|mut h2| { + .and_then(|(mut client, h2)| { let request = Request::builder() .method(Method::GET) .uri("https://example.com/") .body(()) .unwrap(); - let req = h2.send_request(request, true) + let req = client + .send_request(request, true) .unwrap() .expect("response") .and_then(|resp| { diff --git a/tests/support/mock.rs b/tests/support/mock.rs index ee59fe6..d1dc224 100644 --- a/tests/support/mock.rs +++ b/tests/support/mock.rs @@ -480,7 +480,7 @@ where Async::Ready((frame, handle)) => (frame, handle), Async::NotReady => return Ok(Async::NotReady), }; - assert_eq!(frame.unwrap(), self.frame); + assert_eq!(frame.unwrap(), self.frame, "recv_frame"); Ok(Async::Ready(handle)) } } diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 8353070..f05a580 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -1,5 +1,11 @@ //! Utilities to support tests. +#[cfg(not(feature = "unstable"))] +compile_error!( + "Tests depend on the 'unstable' feature on h2. \ + Retry with `cargo test --features unstable`" +); + pub extern crate bytes; pub extern crate env_logger; pub extern crate futures; diff --git a/tests/support/prelude.rs b/tests/support/prelude.rs index 92821ca..22314e9 100644 --- a/tests/support/prelude.rs +++ b/tests/support/prelude.rs @@ -60,7 +60,7 @@ pub trait ClientExt { fn run(&mut self, f: F) -> Result; } -impl ClientExt for Client +impl ClientExt for client::Connection where T: AsyncRead + AsyncWrite + 'static, B: IntoBuf + 'static, diff --git a/tests/trailers.rs b/tests/trailers.rs index 237d0b8..fb3d690 100644 --- a/tests/trailers.rs +++ b/tests/trailers.rs @@ -23,7 +23,7 @@ fn recv_trailers_only() { ]) .build(); - let mut h2 = Client::handshake(mock).wait().unwrap(); + let (mut client, mut h2) = Client::handshake(mock).wait().unwrap(); // Send the request let request = Request::builder() @@ -32,7 +32,7 @@ fn recv_trailers_only() { .unwrap(); info!("sending request"); - let mut stream = h2.send_request(request, true).unwrap(); + let mut stream = client.send_request(request, true).unwrap(); let response = h2.run(poll_fn(|| stream.poll_response())).unwrap(); assert_eq!(response.status(), StatusCode::OK); @@ -71,7 +71,7 @@ fn send_trailers_immediately() { ]) .build(); - let mut h2 = Client::handshake(mock).wait().unwrap(); + let (mut client, mut h2) = Client::handshake(mock).wait().unwrap(); // Send the request let request = Request::builder() @@ -80,7 +80,7 @@ fn send_trailers_immediately() { .unwrap(); info!("sending request"); - let mut stream = h2.send_request(request, false).unwrap(); + let mut stream = client.send_request(request, false).unwrap(); let mut trailers = HeaderMap::new(); trailers.insert("zomg", "hello".parse().unwrap());