add set_target_window_size methods to Server and Client (#149)
Closes #101
This commit is contained in:
		| @@ -243,6 +243,21 @@ impl Default for Builder { | |||||||
|  |  | ||||||
| // ===== impl Connection ===== | // ===== impl Connection ===== | ||||||
|  |  | ||||||
|  |  | ||||||
|  | impl<T, B> Connection<T, B> | ||||||
|  | where | ||||||
|  |     T: AsyncRead + AsyncWrite, | ||||||
|  |     B: IntoBuf, | ||||||
|  | { | ||||||
|  |     /// Sets the target window size for the whole connection. | ||||||
|  |     /// | ||||||
|  |     /// Default in HTTP2 is 65_535. | ||||||
|  |     pub fn set_target_window_size(&mut self, size: u32) { | ||||||
|  |         assert!(size <= proto::MAX_WINDOW_SIZE); | ||||||
|  |         self.inner.set_target_window_size(size); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| impl<T, B> Future for Connection<T, B> | impl<T, B> Future for Connection<T, B> | ||||||
| where | where | ||||||
|     T: AsyncRead + AsyncWrite, |     T: AsyncRead + AsyncWrite, | ||||||
|   | |||||||
| @@ -91,6 +91,10 @@ where | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn set_target_window_size(&mut self, size: WindowSize) { | ||||||
|  |         self.streams.set_target_connection_window_size(size); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /// Returns `Ready` when the connection is ready to receive a frame. |     /// Returns `Ready` when the connection is ready to receive a frame. | ||||||
|     /// |     /// | ||||||
|     /// Returns `RecvError` as this may raise errors that are caused by delayed |     /// Returns `RecvError` as this may raise errors that are caused by delayed | ||||||
|   | |||||||
| @@ -1,6 +1,8 @@ | |||||||
| use frame::Reason; | use frame::Reason; | ||||||
| use proto::{WindowSize, MAX_WINDOW_SIZE}; | use proto::{WindowSize, MAX_WINDOW_SIZE}; | ||||||
|  |  | ||||||
|  | use std::fmt; | ||||||
|  |  | ||||||
| // We don't want to send WINDOW_UPDATE frames for tiny changes, but instead | // We don't want to send WINDOW_UPDATE frames for tiny changes, but instead | ||||||
| // aggregate them when the changes are significant. Many implementations do | // aggregate them when the changes are significant. Many implementations do | ||||||
| // this by keeping a "ratio" of the update version the allowed window size. | // this by keeping a "ratio" of the update version the allowed window size. | ||||||
| @@ -25,32 +27,41 @@ fn sanity_unclaimed_ratio() { | |||||||
|  |  | ||||||
| #[derive(Copy, Clone, Debug)] | #[derive(Copy, Clone, Debug)] | ||||||
| pub struct FlowControl { | pub struct FlowControl { | ||||||
|     /// Window size as indicated by the peer. This can go negative. |     /// Window the peer knows about. | ||||||
|     window_size: i32, |     /// | ||||||
|  |     /// This can go negative if a SETTINGS_INITIAL_WINDOW_SIZE is received. | ||||||
|  |     /// | ||||||
|  |     /// For example, say the peer sends a request and uses 32kb of the window. | ||||||
|  |     /// We send a SETTINGS_INITIAL_WINDOW_SIZE of 16kb. The peer has to adjust | ||||||
|  |     /// its understanding of the capacity of the window, and that would be: | ||||||
|  |     /// | ||||||
|  |     /// ```notrust | ||||||
|  |     /// default (64kb) - used (32kb) - settings_diff (64kb - 16kb): -16kb | ||||||
|  |     /// ``` | ||||||
|  |     window_size: Window, | ||||||
|  |  | ||||||
|     /// The amount of the window that is currently available to consume. |     /// Window that we know about. | ||||||
|     available: WindowSize, |     /// | ||||||
|  |     /// This can go negative if a user declares a smaller target window than | ||||||
|  |     /// the peer knows about. | ||||||
|  |     available: Window, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl FlowControl { | impl FlowControl { | ||||||
|     pub fn new() -> FlowControl { |     pub fn new() -> FlowControl { | ||||||
|         FlowControl { |         FlowControl { | ||||||
|             window_size: 0, |             window_size: Window(0), | ||||||
|             available: 0, |             available: Window(0), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Returns the window size as known by the peer |     /// Returns the window size as known by the peer | ||||||
|     pub fn window_size(&self) -> WindowSize { |     pub fn window_size(&self) -> WindowSize { | ||||||
|         if self.window_size < 0 { |         self.window_size.as_size() | ||||||
|             0 |  | ||||||
|         } else { |  | ||||||
|             self.window_size as WindowSize |  | ||||||
|         } |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Returns the window size available to the consumer |     /// Returns the window size available to the consumer | ||||||
|     pub fn available(&self) -> WindowSize { |     pub fn available(&self) -> Window { | ||||||
|         self.available |         self.available | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -60,11 +71,10 @@ impl FlowControl { | |||||||
|             return false; |             return false; | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         self.window_size as WindowSize > self.available |         self.window_size > self.available | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn claim_capacity(&mut self, capacity: WindowSize) { |     pub fn claim_capacity(&mut self, capacity: WindowSize) { | ||||||
|         assert!(self.available >= capacity); |  | ||||||
|         self.available -= capacity; |         self.available -= capacity; | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -80,14 +90,14 @@ impl FlowControl { | |||||||
|     /// |     /// | ||||||
|     /// This represents pending outbound WINDOW_UPDATE frames. |     /// This represents pending outbound WINDOW_UPDATE frames. | ||||||
|     pub fn unclaimed_capacity(&self) -> Option<WindowSize> { |     pub fn unclaimed_capacity(&self) -> Option<WindowSize> { | ||||||
|         let available = self.available as i32; |         let available = self.available; | ||||||
|  |  | ||||||
|         if self.window_size >= available { |         if self.window_size >= available { | ||||||
|             return None; |             return None; | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         let unclaimed = available - self.window_size; |         let unclaimed = available.0 - self.window_size.0; | ||||||
|         let threshold = self.window_size / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR; |         let threshold = self.window_size.0 / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR; | ||||||
|  |  | ||||||
|         if unclaimed < threshold { |         if unclaimed < threshold { | ||||||
|             None |             None | ||||||
| @@ -100,7 +110,7 @@ impl FlowControl { | |||||||
|     /// |     /// | ||||||
|     /// This is called after receiving a WINDOW_UPDATE frame |     /// This is called after receiving a WINDOW_UPDATE frame | ||||||
|     pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> { |     pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> { | ||||||
|         let (val, overflow) = self.window_size.overflowing_add(sz as i32); |         let (val, overflow) = self.window_size.0.overflowing_add(sz as i32); | ||||||
|  |  | ||||||
|         if overflow { |         if overflow { | ||||||
|             return Err(Reason::FLOW_CONTROL_ERROR); |             return Err(Reason::FLOW_CONTROL_ERROR); | ||||||
| @@ -117,7 +127,7 @@ impl FlowControl { | |||||||
|             val |             val | ||||||
|         ); |         ); | ||||||
|  |  | ||||||
|         self.window_size = val; |         self.window_size = Window(val); | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -133,8 +143,7 @@ impl FlowControl { | |||||||
|             self.available |             self.available | ||||||
|         ); |         ); | ||||||
|         // This should not be able to overflow `window_size` from the bottom. |         // This should not be able to overflow `window_size` from the bottom. | ||||||
|         self.window_size -= sz as i32; |         self.window_size -= sz; | ||||||
|         self.available = self.available.saturating_sub(sz); |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Decrements the window reflecting data has actually been sent. The caller |     /// Decrements the window reflecting data has actually been sent. The caller | ||||||
| @@ -148,10 +157,98 @@ impl FlowControl { | |||||||
|         ); |         ); | ||||||
|  |  | ||||||
|         // Ensure that the argument is correct |         // Ensure that the argument is correct | ||||||
|         assert!(sz <= self.window_size as WindowSize); |         assert!(sz <= self.window_size); | ||||||
|  |  | ||||||
|         // Update values |         // Update values | ||||||
|         self.window_size -= sz as i32; |         self.window_size -= sz; | ||||||
|         self.available -= sz; |         self.available -= sz; | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /// The current capacity of a flow-controlled Window. | ||||||
|  | /// | ||||||
|  | /// This number can go negative when either side has used a certain amount | ||||||
|  | /// of capacity when the other side advertises a reduction in size. | ||||||
|  | /// | ||||||
|  | /// This type tries to centralize the knowledge of addition and subtraction | ||||||
|  | /// to this capacity, instead of having integer casts throughout the source. | ||||||
|  | #[derive(Clone, Copy, Debug, PartialEq, PartialOrd)] | ||||||
|  | pub struct Window(i32); | ||||||
|  |  | ||||||
|  | impl Window { | ||||||
|  |     pub fn as_size(&self) -> WindowSize { | ||||||
|  |         if self.0 < 0 { | ||||||
|  |             0 | ||||||
|  |         } else { | ||||||
|  |             self.0 as WindowSize | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn checked_size(&self) -> WindowSize { | ||||||
|  |         assert!(self.0 >= 0, "negative Window"); | ||||||
|  |         self.0 as WindowSize | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl PartialEq<WindowSize> for Window { | ||||||
|  |     fn eq(&self, other: &WindowSize) -> bool { | ||||||
|  |         if self.0 < 0 { | ||||||
|  |             false | ||||||
|  |         } else { | ||||||
|  |             (self.0 as WindowSize).eq(other) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  |  | ||||||
|  | impl PartialEq<Window> for WindowSize { | ||||||
|  |     fn eq(&self, other: &Window) -> bool { | ||||||
|  |         other.eq(self) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl PartialOrd<WindowSize> for Window { | ||||||
|  |     fn partial_cmp(&self, other: &WindowSize) -> Option<::std::cmp::Ordering> { | ||||||
|  |         if self.0 < 0 { | ||||||
|  |             Some(::std::cmp::Ordering::Less) | ||||||
|  |         } else { | ||||||
|  |             (self.0 as WindowSize).partial_cmp(other) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl PartialOrd<Window> for WindowSize { | ||||||
|  |     fn partial_cmp(&self, other: &Window) -> Option<::std::cmp::Ordering> { | ||||||
|  |         if other.0 < 0 { | ||||||
|  |             Some(::std::cmp::Ordering::Greater) | ||||||
|  |         } else { | ||||||
|  |             self.partial_cmp(&(other.0 as WindowSize)) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  |  | ||||||
|  | impl ::std::ops::SubAssign<WindowSize> for Window { | ||||||
|  |     fn sub_assign(&mut self, other: WindowSize) { | ||||||
|  |         self.0 -= other as i32; | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl ::std::ops::Add<WindowSize> for Window { | ||||||
|  |     type Output = Self; | ||||||
|  |     fn add(self, other: WindowSize) -> Self::Output { | ||||||
|  |         Window(self.0 + other as i32) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl ::std::ops::AddAssign<WindowSize> for Window { | ||||||
|  |     fn add_assign(&mut self, other: WindowSize) { | ||||||
|  |         self.0 += other as i32; | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl fmt::Display for Window { | ||||||
|  |     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||||
|  |         fmt::Display::fmt(&self.0, f) | ||||||
|  |     } | ||||||
|  | } | ||||||
|   | |||||||
| @@ -187,7 +187,7 @@ where | |||||||
|             stream.requested_send_capacity = capacity; |             stream.requested_send_capacity = capacity; | ||||||
|  |  | ||||||
|             // Currently available capacity assigned to the stream |             // Currently available capacity assigned to the stream | ||||||
|             let available = stream.send_flow.available(); |             let available = stream.send_flow.available().as_size(); | ||||||
|  |  | ||||||
|             // If the stream has more assigned capacity than requested, reclaim |             // If the stream has more assigned capacity than requested, reclaim | ||||||
|             // some for the connection |             // some for the connection | ||||||
| @@ -275,9 +275,9 @@ where | |||||||
|         // The amount of additional capacity that the stream requests. |         // The amount of additional capacity that the stream requests. | ||||||
|         // Don't assign more than the window has available! |         // Don't assign more than the window has available! | ||||||
|         let additional = cmp::min( |         let additional = cmp::min( | ||||||
|             total_requested - stream.send_flow.available(), |             total_requested - stream.send_flow.available().as_size(), | ||||||
|             // Can't assign more than what is available |             // Can't assign more than what is available | ||||||
|             stream.send_flow.window_size() - stream.send_flow.available(), |             stream.send_flow.window_size() - stream.send_flow.available().as_size(), | ||||||
|         ); |         ); | ||||||
|  |  | ||||||
|         trace!( |         trace!( | ||||||
| @@ -304,7 +304,7 @@ where | |||||||
|         ); |         ); | ||||||
|  |  | ||||||
|         // The amount of currently available capacity on the connection |         // The amount of currently available capacity on the connection | ||||||
|         let conn_available = self.flow.available(); |         let conn_available = self.flow.available().as_size(); | ||||||
|  |  | ||||||
|         // First check if capacity is immediately available |         // First check if capacity is immediately available | ||||||
|         if conn_available > 0 { |         if conn_available > 0 { | ||||||
| @@ -550,7 +550,7 @@ where | |||||||
|                             let len = cmp::min(sz, max_len); |                             let len = cmp::min(sz, max_len); | ||||||
|  |  | ||||||
|                             // Only send up to the stream's window capacity |                             // Only send up to the stream's window capacity | ||||||
|                             let len = cmp::min(len, stream_capacity as usize) as WindowSize; |                             let len = cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; | ||||||
|  |  | ||||||
|                             // There *must* be be enough connection level |                             // There *must* be be enough connection level | ||||||
|                             // capacity at this point. |                             // capacity at this point. | ||||||
|   | |||||||
| @@ -20,6 +20,9 @@ where | |||||||
|     /// Connection level flow control governing received data |     /// Connection level flow control governing received data | ||||||
|     flow: FlowControl, |     flow: FlowControl, | ||||||
|  |  | ||||||
|  |     /// Amount of connection window capacity currently used by outstanding streams. | ||||||
|  |     in_flight_data: WindowSize, | ||||||
|  |  | ||||||
|     /// The lowest stream ID that is still idle |     /// The lowest stream ID that is still idle | ||||||
|     next_stream_id: Result<StreamId, StreamIdOverflow>, |     next_stream_id: Result<StreamId, StreamIdOverflow>, | ||||||
|  |  | ||||||
| @@ -75,6 +78,7 @@ where | |||||||
|         Recv { |         Recv { | ||||||
|             init_window_sz: config.local_init_window_sz, |             init_window_sz: config.local_init_window_sz, | ||||||
|             flow: flow, |             flow: flow, | ||||||
|  |             in_flight_data: 0 as WindowSize, | ||||||
|             next_stream_id: Ok(next_stream_id.into()), |             next_stream_id: Ok(next_stream_id.into()), | ||||||
|             pending_window_updates: store::Queue::new(), |             pending_window_updates: store::Queue::new(), | ||||||
|             last_processed_id: StreamId::zero(), |             last_processed_id: StreamId::zero(), | ||||||
| @@ -223,6 +227,7 @@ where | |||||||
|  |  | ||||||
|         // Decrement in-flight data |         // Decrement in-flight data | ||||||
|         stream.in_flight_recv_data -= capacity; |         stream.in_flight_recv_data -= capacity; | ||||||
|  |         self.in_flight_data -= capacity; | ||||||
|  |  | ||||||
|         // Assign capacity to connection & stream |         // Assign capacity to connection & stream | ||||||
|         self.flow.assign_capacity(capacity); |         self.flow.assign_capacity(capacity); | ||||||
| @@ -246,6 +251,48 @@ where | |||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// Set the "target" connection window size. | ||||||
|  |     /// | ||||||
|  |     /// By default, all new connections start with 64kb of window size. As | ||||||
|  |     /// streams used and release capacity, we will send WINDOW_UPDATEs for the | ||||||
|  |     /// connection to bring it back up to the initial "target". | ||||||
|  |     /// | ||||||
|  |     /// Setting a target means that we will try to tell the peer about | ||||||
|  |     /// WINDOW_UPDATEs so the peer knows it has about `target` window to use | ||||||
|  |     /// for the whole conection. | ||||||
|  |     /// | ||||||
|  |     /// The `task` is an optional parked task for the `Connection` that might | ||||||
|  |     /// be blocked on needing more window capacity. | ||||||
|  |     pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Task>) { | ||||||
|  |         trace!( | ||||||
|  |             "set_target_connection_window; target={}; available={}, reserved={}", | ||||||
|  |             target, | ||||||
|  |             self.flow.available(), | ||||||
|  |             self.in_flight_data, | ||||||
|  |         ); | ||||||
|  |  | ||||||
|  |         // The current target connection window is our `available` plus any | ||||||
|  |         // in-flight data reserved by streams. | ||||||
|  |         // | ||||||
|  |         // Update the flow controller with the difference between the new | ||||||
|  |         // target and the current target. | ||||||
|  |         let current = (self.flow.available() + self.in_flight_data).checked_size(); | ||||||
|  |         if target > current { | ||||||
|  |             self.flow.assign_capacity(target - current); | ||||||
|  |         } else { | ||||||
|  |             self.flow.claim_capacity(current - target); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         // If changing the target capacity means we gained a bunch of capacity, | ||||||
|  |         // enough that we went over the update threshold, then schedule sending | ||||||
|  |         // a connection WINDOW_UPDATE. | ||||||
|  |         if self.flow.unclaimed_capacity().is_some() { | ||||||
|  |             if let Some(task) = task.take() { | ||||||
|  |                 task.notify(); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn body_is_empty(&self, stream: &store::Ptr<B, P>) -> bool { |     pub fn body_is_empty(&self, stream: &store::Ptr<B, P>) -> bool { | ||||||
|         if !stream.state.is_recv_closed() { |         if !stream.state.is_recv_closed() { | ||||||
|             return false; |             return false; | ||||||
| @@ -298,6 +345,7 @@ where | |||||||
|  |  | ||||||
|         // Track the data as in-flight |         // Track the data as in-flight | ||||||
|         stream.in_flight_recv_data += sz; |         stream.in_flight_recv_data += sz; | ||||||
|  |         self.in_flight_data += sz; | ||||||
|  |  | ||||||
|         if stream.dec_content_length(frame.payload().len()).is_err() { |         if stream.dec_content_length(frame.payload().len()).is_err() { | ||||||
|             return Err(RecvError::Stream { |             return Err(RecvError::Stream { | ||||||
|   | |||||||
| @@ -6,7 +6,7 @@ use proto::*; | |||||||
|  |  | ||||||
| use bytes::Buf; | use bytes::Buf; | ||||||
|  |  | ||||||
| use std::io; | use std::{cmp, io}; | ||||||
|  |  | ||||||
| /// Manages state transitions related to outbound frames. | /// Manages state transitions related to outbound frames. | ||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| @@ -144,7 +144,7 @@ where | |||||||
|  |  | ||||||
|         // Reclaim all capacity assigned to the stream and re-assign it to the |         // Reclaim all capacity assigned to the stream and re-assign it to the | ||||||
|         // connection |         // connection | ||||||
|         let available = stream.send_flow.available(); |         let available = stream.send_flow.available().as_size(); | ||||||
|         stream.send_flow.claim_capacity(available); |         stream.send_flow.claim_capacity(available); | ||||||
|  |  | ||||||
|         let frame = frame::Reset::new(stream.id, reason); |         let frame = frame::Reset::new(stream.id, reason); | ||||||
| @@ -224,7 +224,7 @@ where | |||||||
|  |  | ||||||
|     /// Current available stream send capacity |     /// Current available stream send capacity | ||||||
|     pub fn capacity(&self, stream: &mut store::Ptr<B, P>) -> WindowSize { |     pub fn capacity(&self, stream: &mut store::Ptr<B, P>) -> WindowSize { | ||||||
|         let available = stream.send_flow.available(); |         let available = stream.send_flow.available().as_size(); | ||||||
|         let buffered = stream.buffered_send_data; |         let buffered = stream.buffered_send_data; | ||||||
|  |  | ||||||
|         if available <= buffered { |         if available <= buffered { | ||||||
| @@ -265,7 +265,7 @@ where | |||||||
|  |  | ||||||
|         // Reclaim all capacity assigned to the stream and re-assign it to the |         // Reclaim all capacity assigned to the stream and re-assign it to the | ||||||
|         // connection |         // connection | ||||||
|         let available = stream.send_flow.available(); |         let available = stream.send_flow.available().as_size(); | ||||||
|         stream.send_flow.claim_capacity(available); |         stream.send_flow.claim_capacity(available); | ||||||
|         // Re-assign all capacity to the connection |         // Re-assign all capacity to the connection | ||||||
|         self.prioritize |         self.prioritize | ||||||
| @@ -308,6 +308,10 @@ where | |||||||
|                     let stream = &mut *stream; |                     let stream = &mut *stream; | ||||||
|  |  | ||||||
|                     stream.send_flow.dec_window(dec); |                     stream.send_flow.dec_window(dec); | ||||||
|  |  | ||||||
|  |                     let available = stream.send_flow.available().as_size(); | ||||||
|  |                     stream.send_flow.claim_capacity(cmp::min(dec, available)); | ||||||
|  |  | ||||||
|                     trace!( |                     trace!( | ||||||
|                         "decremented stream window; id={:?}; decr={}; flow={:?}", |                         "decremented stream window; id={:?}; decr={}; flow={:?}", | ||||||
|                         stream.id, |                         stream.id, | ||||||
|   | |||||||
| @@ -80,6 +80,15 @@ where | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn set_target_connection_window_size(&mut self, size: WindowSize) { | ||||||
|  |         let mut me = self.inner.lock().unwrap(); | ||||||
|  |         let me = &mut *me; | ||||||
|  |  | ||||||
|  |         me.actions | ||||||
|  |             .recv | ||||||
|  |             .set_target_connection_window(size, &mut me.actions.task) | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /// Process inbound headers |     /// Process inbound headers | ||||||
|     pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), RecvError> { |     pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), RecvError> { | ||||||
|         let id = frame.stream_id(); |         let id = frame.stream_id(); | ||||||
|   | |||||||
| @@ -121,6 +121,14 @@ where | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// Sets the target window size for the whole connection. | ||||||
|  |     /// | ||||||
|  |     /// Default in HTTP2 is 65_535. | ||||||
|  |     pub fn set_target_window_size(&mut self, size: u32) { | ||||||
|  |         assert!(size <= proto::MAX_WINDOW_SIZE); | ||||||
|  |         self.connection.set_target_window_size(size); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /// Returns `Ready` when the underlying connection has closed. |     /// Returns `Ready` when the underlying connection has closed. | ||||||
|     pub fn poll_close(&mut self) -> Poll<(), ::Error> { |     pub fn poll_close(&mut self) -> Poll<(), ::Error> { | ||||||
|         self.connection.poll().map_err(Into::into) |         self.connection.poll().map_err(Into::into) | ||||||
|   | |||||||
| @@ -800,3 +800,132 @@ fn recv_settings_removes_available_capacity() { | |||||||
|     let _ = h2.join(srv) |     let _ = h2.join(srv) | ||||||
|         .wait().unwrap(); |         .wait().unwrap(); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn client_increase_target_window_size() { | ||||||
|  |     let _ = ::env_logger::init(); | ||||||
|  |     let (io, srv) = mock::new(); | ||||||
|  |  | ||||||
|  |     let srv = srv.assert_client_handshake() | ||||||
|  |         .unwrap() | ||||||
|  |         .recv_settings() | ||||||
|  |         .recv_frame(frames::window_update(0, (2 << 20) - 65_535)) | ||||||
|  |         .close(); | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     let client = Client::handshake(io).unwrap() | ||||||
|  |         .and_then(|(_client, mut conn)| { | ||||||
|  |             conn.set_target_window_size(2 << 20); | ||||||
|  |  | ||||||
|  |             conn.unwrap() | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |     srv.join(client).wait().unwrap(); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn increase_target_window_size_after_using_some() { | ||||||
|  |     let _ = ::env_logger::init(); | ||||||
|  |     let (io, srv) = mock::new(); | ||||||
|  |  | ||||||
|  |     let srv = srv.assert_client_handshake() | ||||||
|  |         .unwrap() | ||||||
|  |         .recv_settings() | ||||||
|  |         .recv_frame( | ||||||
|  |             frames::headers(1) | ||||||
|  |                 .request("GET", "https://http2.akamai.com/") | ||||||
|  |                 .eos() | ||||||
|  |         ) | ||||||
|  |         .send_frame(frames::headers(1).response(200)) | ||||||
|  |         .send_frame(frames::data(1, vec![0; 16_384]).eos()) | ||||||
|  |         .recv_frame(frames::window_update(0, (2 << 20) - 65_535)) | ||||||
|  |         .close(); | ||||||
|  |  | ||||||
|  |     let client = Client::handshake(io).unwrap() | ||||||
|  |         .and_then(|(mut client, conn)| { | ||||||
|  |             let request = Request::builder() | ||||||
|  |                 .uri("https://http2.akamai.com/") | ||||||
|  |                 .body(()).unwrap(); | ||||||
|  |  | ||||||
|  |             let res = client.send_request(request, true).unwrap() | ||||||
|  |                 .and_then(|res| { | ||||||
|  |                     // "leak" the capacity for now | ||||||
|  |                     res.into_parts().1.concat2() | ||||||
|  |                 }); | ||||||
|  |  | ||||||
|  |             conn.drive(res) | ||||||
|  |                 .and_then(|(mut conn, _bytes)| { | ||||||
|  |                     conn.set_target_window_size(2 << 20); | ||||||
|  |                     conn.unwrap() | ||||||
|  |                 }) | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |     srv.join(client).wait().unwrap(); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn decrease_target_window_size() { | ||||||
|  |      let _ = ::env_logger::init(); | ||||||
|  |     let (io, srv) = mock::new(); | ||||||
|  |  | ||||||
|  |     let srv = srv.assert_client_handshake() | ||||||
|  |         .unwrap() | ||||||
|  |         .recv_settings() | ||||||
|  |         .recv_frame( | ||||||
|  |             frames::headers(1) | ||||||
|  |                 .request("GET", "https://http2.akamai.com/") | ||||||
|  |                 .eos() | ||||||
|  |         ) | ||||||
|  |         .send_frame(frames::headers(1).response(200)) | ||||||
|  |         .send_frame(frames::data(1, vec![0; 16_384])) | ||||||
|  |         .send_frame(frames::data(1, vec![0; 16_384])) | ||||||
|  |         .send_frame(frames::data(1, vec![0; 16_384])) | ||||||
|  |         .send_frame(frames::data(1, vec![0; 16_383]).eos()) | ||||||
|  |         .recv_frame(frames::window_update(0, 16_384)) | ||||||
|  |         .close(); | ||||||
|  |  | ||||||
|  |     let client = Client::handshake(io).unwrap() | ||||||
|  |         .and_then(|(mut client, mut conn)| { | ||||||
|  |             conn.set_target_window_size(16_384 * 2); | ||||||
|  |  | ||||||
|  |             let request = Request::builder() | ||||||
|  |                 .uri("https://http2.akamai.com/") | ||||||
|  |                 .body(()).unwrap(); | ||||||
|  |             let res = client.send_request(request, true).unwrap(); | ||||||
|  |             conn.drive(res.expect("response")) | ||||||
|  |         }) | ||||||
|  |         .and_then(|(mut conn, res)| { | ||||||
|  |             conn.set_target_window_size(16_384); | ||||||
|  |             let mut body = res.into_parts().1; | ||||||
|  |             let mut cap = body.release_capacity().clone(); | ||||||
|  |  | ||||||
|  |             conn.drive(body.concat2().expect("concat")) | ||||||
|  |                 .and_then(move |(conn, bytes)| { | ||||||
|  |                     assert_eq!(bytes.len(), 65_535); | ||||||
|  |                     cap.release_capacity(bytes.len()).unwrap(); | ||||||
|  |                     conn.expect("conn") | ||||||
|  |                 }) | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |     srv.join(client).wait().unwrap(); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn server_target_window_size() { | ||||||
|  |     let _ = ::env_logger::init(); | ||||||
|  |     let (io, client) = mock::new(); | ||||||
|  |  | ||||||
|  |     let client = client.assert_server_handshake() | ||||||
|  |         .unwrap() | ||||||
|  |         .recv_settings() | ||||||
|  |         .recv_frame(frames::window_update(0, (2 << 20) - 65_535)) | ||||||
|  |         .close(); | ||||||
|  |  | ||||||
|  |     let srv = Server::handshake(io).unwrap() | ||||||
|  |         .and_then(|mut conn| { | ||||||
|  |             conn.set_target_window_size(2 << 20); | ||||||
|  |             conn.into_future().unwrap() | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |     srv.join(client).wait().unwrap(); | ||||||
|  | } | ||||||
|   | |||||||
| @@ -538,7 +538,7 @@ impl Future for Idle { | |||||||
|         match self.handle.as_mut().unwrap().poll() { |         match self.handle.as_mut().unwrap().poll() { | ||||||
|             Ok(Async::NotReady) => Ok(Async::NotReady), |             Ok(Async::NotReady) => Ok(Async::NotReady), | ||||||
|             res => { |             res => { | ||||||
|                 panic!("Received unexpected frame on handle; frame={:?}", res); |                 panic!("Idle received unexpected frame on handle; frame={:?}", res); | ||||||
|             }, |             }, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user