Simplify control streams (#8)
This commit is contained in:
		| @@ -52,16 +52,8 @@ impl Peer for Client { | ||||
|     type Send = http::request::Head; | ||||
|     type Poll = http::response::Head; | ||||
|  | ||||
|     fn is_valid_local_stream_id(id: StreamId) -> bool { | ||||
|         id.is_client_initiated() | ||||
|     } | ||||
|  | ||||
|     fn is_valid_remote_stream_id(id: StreamId) -> bool { | ||||
|         id.is_server_initiated() | ||||
|     } | ||||
|  | ||||
|     fn local_can_open() -> bool { | ||||
|         true | ||||
|     fn is_server() -> bool { | ||||
|         false | ||||
|     } | ||||
|  | ||||
|     fn convert_send_message( | ||||
|   | ||||
							
								
								
									
										18
									
								
								src/lib.rs
									
									
									
									
									
								
							
							
						
						
									
										18
									
								
								src/lib.rs
									
									
									
									
									
								
							| @@ -77,23 +77,7 @@ pub trait Peer { | ||||
|     /// Message type polled from the transport | ||||
|     type Poll; | ||||
|  | ||||
|     /// Returns `true` if `id` is a valid StreamId for a stream initiated by the | ||||
|     /// local node. | ||||
|     fn is_valid_local_stream_id(id: StreamId) -> bool; | ||||
|  | ||||
|     /// Returns `true` if `id` is a valid StreamId for a stream initiated by the | ||||
|     /// remote node. | ||||
|     fn is_valid_remote_stream_id(id: StreamId) -> bool; | ||||
|  | ||||
|     fn local_can_open() -> bool; | ||||
|     fn remote_can_open() -> bool { | ||||
|         !Self::local_can_open() | ||||
|     } | ||||
|  | ||||
|     //fn can_reserve_local_stream() -> bool; | ||||
|     // fn can_reserve_remote_stream() -> bool { | ||||
|     //     !self.can_reserve_local_stream | ||||
|     // } | ||||
|     fn is_server() -> bool; | ||||
|  | ||||
|     #[doc(hidden)] | ||||
|     fn convert_send_message( | ||||
|   | ||||
| @@ -13,13 +13,13 @@ use std::marker::PhantomData; | ||||
| /// An H2 connection | ||||
| #[derive(Debug)] | ||||
| pub struct Connection<T, P, B: IntoBuf = Bytes> { | ||||
|     inner: Transport<T, P, B::Buf>, | ||||
|     inner: Transport<T, B::Buf>, | ||||
|     // Set to `true` as long as the connection is in a valid state. | ||||
|     active: bool, | ||||
|     _phantom: PhantomData<(P, B)>, | ||||
| } | ||||
|  | ||||
| pub fn new<T, P, B>(transport: Transport<T, P, B::Buf>) | ||||
| pub fn new<T, P, B>(transport: Transport<T, B::Buf>) | ||||
|     -> Connection<T, P, B> | ||||
|     where T: AsyncRead + AsyncWrite, | ||||
|           P: Peer, | ||||
|   | ||||
| @@ -1,204 +1,22 @@ | ||||
| use ConnectionError; | ||||
| use proto::*; | ||||
|  | ||||
| /// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up | ||||
| /// to Connection). | ||||
| pub trait ControlStreams { | ||||
|     /// Determines whether the given stream could theoretically be opened by the local | ||||
|     /// side of this connection. | ||||
|     fn local_valid_id(id: StreamId) -> bool; | ||||
|     fn streams(&self) -> &Streams; | ||||
|  | ||||
|     /// Determines whether the given stream could theoretically be opened by the remote | ||||
|     /// side of this connection. | ||||
|     fn remote_valid_id(id: StreamId) -> bool; | ||||
|  | ||||
|     /// Indicates whether this local endpoint may open streams (with HEADERS). | ||||
|     /// | ||||
|     /// Implies that this endpoint is a client. | ||||
|     fn local_can_open() -> bool; | ||||
|  | ||||
|     /// Indicates whether this remote endpoint may open streams (with HEADERS). | ||||
|     /// | ||||
|     /// Implies that this endpoint is a server. | ||||
|     fn remote_can_open() -> bool { | ||||
|         !Self::local_can_open() | ||||
|     } | ||||
|  | ||||
|     // TODO push promise | ||||
|     // fn local_can_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; | ||||
|     // fn remote_can_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; | ||||
|  | ||||
|     /// Creates a new stream in the OPEN state from the local side (i.e. as a Client). | ||||
|     /// | ||||
|     /// Must only be called when local_can_open returns true. | ||||
|     fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; | ||||
|  | ||||
|     /// Create a new stream in the OPEN state from the remote side (i.e. as a Server). | ||||
|     /// | ||||
|     /// Must only be called when remote_can_open returns true. | ||||
|     fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; | ||||
|  | ||||
|     /// Prepare the receive side of a local stream to receive data from the remote. | ||||
|     /// | ||||
|     /// Typically called when a client receives a response header. | ||||
|     fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; | ||||
|  | ||||
|     /// Prepare the send side of a remote stream to receive data from the local endpoint. | ||||
|     /// | ||||
|     /// Typically called when a server sends a response header. | ||||
|     fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; | ||||
|  | ||||
|     // TODO push promise | ||||
|     // fn local_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; | ||||
|     // fn remote_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; | ||||
|  | ||||
|     /// Closes the send half of a stream. | ||||
|     /// | ||||
|     /// Fails with a ProtocolError if send half of the stream was not open. | ||||
|     fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; | ||||
|  | ||||
|     /// Closes the recv half of a stream. | ||||
|     /// | ||||
|     /// Fails with a ProtocolError if recv half of the stream was not open. | ||||
|     fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; | ||||
|  | ||||
|     /// Resets the given stream. | ||||
|     /// | ||||
|     /// If the stream was already reset, the stored cause is updated. | ||||
|     fn reset_stream(&mut self, id: StreamId, cause: Reason); | ||||
|  | ||||
|     /// Get the reason the stream was reset, if it was reset. | ||||
|     fn get_reset(&self, id: StreamId) -> Option<Reason>; | ||||
|  | ||||
|     /// Returns true if the given stream was opened by the local peer and is not yet | ||||
|     /// closed. | ||||
|     fn is_local_active(&self, id: StreamId) -> bool; | ||||
|  | ||||
|     /// Returns true if the given stream was opened by the remote peer and is not yet | ||||
|     /// closed. | ||||
|     fn is_remote_active(&self, id: StreamId) -> bool; | ||||
|  | ||||
|     /// Returns true if the given stream was opened and is not yet closed. | ||||
|     fn is_active(&self, id: StreamId) -> bool { | ||||
|         if Self::local_valid_id(id) { | ||||
|             self.is_local_active(id) | ||||
|         } else { | ||||
|             self.is_remote_active(id) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Returns the number of open streams initiated by the local peer. | ||||
|     fn local_active_len(&self) -> usize; | ||||
|  | ||||
|     /// Returns the number of open streams initiated by the remote peer. | ||||
|     fn remote_active_len(&self) -> usize; | ||||
|  | ||||
|     /// Returns true iff the recv half of the given stream is open. | ||||
|     fn is_recv_open(&mut self, id: StreamId) -> bool; | ||||
|  | ||||
|     /// Returns true iff the send half of the given stream is open. | ||||
|     fn is_send_open(&mut self, id: StreamId) -> bool; | ||||
|  | ||||
|     /// If the given stream ID is active and able to recv data, get its mutable recv flow | ||||
|     /// control state. | ||||
|     fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; | ||||
|  | ||||
|     /// If the given stream ID is active and able to send data, get its mutable send flow | ||||
|     /// control state. | ||||
|     fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; | ||||
|  | ||||
|     /// Updates the initial window size for the local peer. | ||||
|     fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize); | ||||
|  | ||||
|     /// Updates the initial window size for the remote peer. | ||||
|     fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize); | ||||
|     fn streams_mut(&mut self) -> &mut Streams; | ||||
| } | ||||
|  | ||||
| macro_rules! proxy_control_streams { | ||||
|     ($outer:ident) => ( | ||||
|         impl<T: ControlStreams> ControlStreams for $outer<T> { | ||||
|             fn local_valid_id(id: StreamId) -> bool { | ||||
|                 T::local_valid_id(id) | ||||
|             fn streams(&self) -> &Streams { | ||||
|                 self.inner.streams() | ||||
|             } | ||||
|  | ||||
|             fn remote_valid_id(id: StreamId) -> bool { | ||||
|                 T::remote_valid_id(id) | ||||
|             } | ||||
|  | ||||
|             fn local_can_open() -> bool { | ||||
|                 T::local_can_open() | ||||
|             } | ||||
|  | ||||
|             fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { | ||||
|                 self.inner.local_open(id, sz) | ||||
|             } | ||||
|  | ||||
|             fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { | ||||
|                 self.inner.remote_open(id, sz) | ||||
|             } | ||||
|  | ||||
|             fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { | ||||
|                 self.inner.local_open_recv_half(id, sz) | ||||
|             } | ||||
|  | ||||
|             fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { | ||||
|                 self.inner.remote_open_send_half(id, sz) | ||||
|             } | ||||
|  | ||||
|             fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { | ||||
|                 self.inner.close_send_half(id) | ||||
|             } | ||||
|  | ||||
|             fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { | ||||
|                 self.inner.close_recv_half(id) | ||||
|             } | ||||
|  | ||||
|             fn reset_stream(&mut self, id: StreamId, cause: Reason) { | ||||
|                 self.inner.reset_stream(id, cause) | ||||
|             } | ||||
|  | ||||
|             fn get_reset(&self, id: StreamId) -> Option<Reason> { | ||||
|                 self.inner.get_reset(id) | ||||
|             } | ||||
|  | ||||
|             fn is_local_active(&self, id: StreamId) -> bool { | ||||
|                 self.inner.is_local_active(id) | ||||
|             } | ||||
|  | ||||
|             fn is_remote_active(&self, id: StreamId) -> bool { | ||||
|                 self.inner.is_remote_active(id) | ||||
|             } | ||||
|  | ||||
|             fn local_active_len(&self) -> usize { | ||||
|                 self.inner.local_active_len() | ||||
|             } | ||||
|  | ||||
|             fn remote_active_len(&self) -> usize { | ||||
|                 self.inner.remote_active_len() | ||||
|             } | ||||
|  | ||||
|             fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { | ||||
|                 self.inner.update_inital_recv_window_size(old_sz, new_sz) | ||||
|             } | ||||
|  | ||||
|             fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { | ||||
|                 self.inner.update_inital_send_window_size(old_sz, new_sz) | ||||
|             } | ||||
|  | ||||
|             fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { | ||||
|                 self.inner.recv_flow_controller(id) | ||||
|             } | ||||
|  | ||||
|             fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { | ||||
|                 self.inner.send_flow_controller(id) | ||||
|             } | ||||
|  | ||||
|             fn is_send_open(&mut self, id: StreamId) -> bool { | ||||
|                 self.inner.is_send_open(id) | ||||
|             } | ||||
|  | ||||
|             fn is_recv_open(&mut self, id: StreamId) -> bool  { | ||||
|                 self.inner.is_recv_open(id) | ||||
|             fn streams_mut(&mut self) -> &mut Streams { | ||||
|                 self.inner.streams_mut() | ||||
|             } | ||||
|         } | ||||
|     ) | ||||
|   | ||||
| @@ -44,7 +44,7 @@ impl<T, U> FlowControlRecv<T> | ||||
| /// Exposes a public upward API for flow control. | ||||
| impl<T: ControlStreams> ControlFlowRecv for FlowControlRecv<T> { | ||||
|     fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { | ||||
|         let added = match self.recv_flow_controller(id) { | ||||
|         let added = match self.streams_mut().recv_flow_controller(id) { | ||||
|             None => false, | ||||
|             Some(mut fc) => { | ||||
|                 fc.expand_window(incr); | ||||
| @@ -57,7 +57,7 @@ impl<T: ControlStreams> ControlFlowRecv for FlowControlRecv<T> { | ||||
|                 self.pending_streams.push_back(id); | ||||
|             } | ||||
|             Ok(()) | ||||
|         } else if let Some(rst) = self.inner.get_reset(id) { | ||||
|         } else if let Some(rst) = self.streams().get_reset(id) { | ||||
|             Err(error::User::StreamReset(rst).into()) | ||||
|         } else { | ||||
|             Err(error::User::InvalidStreamId.into()) | ||||
| @@ -80,8 +80,8 @@ impl<T, U> FlowControlRecv<T> | ||||
|         } | ||||
|  | ||||
|         while let Some(id) = self.pending_streams.pop_front() { | ||||
|             if self.inner.get_reset(id).is_none() { | ||||
|                 let update = self.recv_flow_controller(id).and_then(|s| s.apply_window_update()); | ||||
|             if self.streams().get_reset(id).is_none() { | ||||
|                 let update = self.streams_mut().recv_flow_controller(id).and_then(|s| s.apply_window_update()); | ||||
|                 if let Some(incr) = update { | ||||
|                     try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); | ||||
|                 } | ||||
| @@ -124,8 +124,9 @@ impl<T> Stream for FlowControlRecv<T> | ||||
|                         return Err(error::Reason::FlowControlError.into()); | ||||
|                     } | ||||
|  | ||||
|                     let fc = self.inner.recv_flow_controller(id) | ||||
|                     let fc = self.inner.streams_mut().recv_flow_controller(id) | ||||
|                         .expect("receiving data with no flow controller"); | ||||
|  | ||||
|                     if fc.claim_window(sz).is_err() { | ||||
|                         // TODO this should cause a GO_AWAY | ||||
|                         return Err(error::Reason::FlowControlError.into()); | ||||
| @@ -206,7 +207,7 @@ impl<T> ApplySettings for FlowControlRecv<T> | ||||
|                 return Ok(()); | ||||
|             } | ||||
|  | ||||
|             self.inner.update_inital_recv_window_size(old_window_size, new_window_size); | ||||
|             self.streams_mut().update_inital_recv_window_size(old_window_size, new_window_size); | ||||
|             self.initial_window_size = new_window_size; | ||||
|         } | ||||
|         Ok(()) | ||||
|   | ||||
| @@ -50,7 +50,7 @@ impl<T: ControlStreams> ControlFlowSend for FlowControlSend<T> { | ||||
|  | ||||
|         // TODO this should probably account for stream priority? | ||||
|         while let Some(id) = self.pending_streams.pop_front() { | ||||
|             if let Some(mut flow) = self.send_flow_controller(id) { | ||||
|             if let Some(mut flow) = self.streams_mut().send_flow_controller(id) { | ||||
|                 if let Some(incr) = flow.apply_window_update() { | ||||
|                     return Ok(Async::Ready(WindowUpdate::new(id, incr))); | ||||
|                 } | ||||
| @@ -84,7 +84,7 @@ impl<T> Stream for FlowControlSend<T> | ||||
|                     } else { | ||||
|                         // The remote may send window updates for streams that the local | ||||
|                         // now considers closed. It's okay. | ||||
|                         if let Some(fc) = self.inner.send_flow_controller(id) { | ||||
|                         if let Some(fc) = self.streams_mut().send_flow_controller(id) { | ||||
|                             fc.expand_window(sz); | ||||
|                         } | ||||
|                     } | ||||
| @@ -110,7 +110,7 @@ impl<T, U> Sink for FlowControlSend<T> | ||||
|     type SinkError = T::SinkError; | ||||
|  | ||||
|     fn start_send(&mut self, frame: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> { | ||||
|         debug_assert!(self.inner.get_reset(frame.stream_id()).is_none()); | ||||
|         debug_assert!(self.streams().get_reset(frame.stream_id()).is_none()); | ||||
|  | ||||
|         // Ensures that the underlying transport is will accept the frame. It's important | ||||
|         //  that this be checked before claiming capacity from the flow controllers. | ||||
| @@ -130,8 +130,9 @@ impl<T, U> Sink for FlowControlSend<T> | ||||
|             } | ||||
|  | ||||
|             // Ensure there's enough capacity on stream. | ||||
|             let mut fc = self.inner.send_flow_controller(v.stream_id()) | ||||
|             let mut fc = self.inner.streams_mut().send_flow_controller(v.stream_id()) | ||||
|                 .expect("no remote stream for data frame"); | ||||
|  | ||||
|             if fc.claim_window(sz).is_err() { | ||||
|                 return Err(error::User::FlowControlViolation.into()) | ||||
|             } | ||||
| @@ -195,7 +196,7 @@ impl<T> ApplySettings for FlowControlSend<T> | ||||
|                 return Ok(()); | ||||
|             } | ||||
|  | ||||
|             self.inner.update_inital_send_window_size(old_window_size, new_window_size); | ||||
|             self.streams_mut().update_inital_send_window_size(old_window_size, new_window_size); | ||||
|             self.initial_window_size = new_window_size; | ||||
|         } | ||||
|  | ||||
|   | ||||
| @@ -97,7 +97,7 @@ use self::stream_recv_close::StreamRecvClose; | ||||
| use self::stream_recv_open::StreamRecvOpen; | ||||
| use self::stream_send_close::StreamSendClose; | ||||
| use self::stream_send_open::StreamSendOpen; | ||||
| use self::stream_states::StreamStates; | ||||
| use self::stream_states::{StreamStates, Streams}; | ||||
|  | ||||
| /// Represents the internals of an HTTP/2 connection. | ||||
| /// | ||||
| @@ -187,22 +187,22 @@ use self::stream_states::StreamStates; | ||||
| /// | ||||
| /// - Encodes frames to bytes. | ||||
| /// | ||||
| type Transport<T, P, B>= | ||||
| type Transport<T, B>= | ||||
|     Settings< | ||||
|         Streams< | ||||
|         Streams2< | ||||
|             PingPong< | ||||
|                 Codec<T, B>, | ||||
|                 B>, | ||||
|             P>>; | ||||
|                 B>>>; | ||||
|  | ||||
| type Streams<T, P> = | ||||
| // TODO: rename | ||||
| type Streams2<T> = | ||||
|     StreamSendOpen< | ||||
|         FlowControlSend< | ||||
|             StreamSendClose< | ||||
|                 StreamRecvClose< | ||||
|                     FlowControlRecv< | ||||
|                         StreamRecvOpen< | ||||
|                             StreamStates<T, P>>>>>>>; | ||||
|                             StreamStates<T>>>>>>>; | ||||
|  | ||||
| type Codec<T, B> = | ||||
|     FramedRead< | ||||
| @@ -303,7 +303,7 @@ pub fn from_server_handshaker<T, P, B>(settings: Settings<FramedWrite<T, B::Buf> | ||||
|                             StreamRecvOpen::new( | ||||
|                                 initial_recv_window_size, | ||||
|                                 local_max_concurrency, | ||||
|                                 StreamStates::new( | ||||
|                                 StreamStates::new::<P>( | ||||
|                                     PingPong::new( | ||||
|                                         FramedRead::new(framed))))))))) | ||||
|     }); | ||||
|   | ||||
| @@ -1,5 +1,4 @@ | ||||
| use ConnectionError; | ||||
| use error::Reason; | ||||
| use frame::{self, Frame}; | ||||
| use proto::*; | ||||
| use proto::ready::ReadySink; | ||||
| @@ -39,10 +38,10 @@ impl<T> Stream for StreamRecvClose<T> | ||||
|             if frame.is_end_stream() { | ||||
|                 trace!("poll: id={:?} eos", id); | ||||
|                 if let &Frame::Reset(ref rst) = &frame { | ||||
|                     self.inner.reset_stream(id, rst.reason()); | ||||
|                     self.streams_mut().reset_stream(id, rst.reason()); | ||||
|                 } else { | ||||
|                     debug_assert!(self.inner.is_active(id)); | ||||
|                     self.inner.close_recv_half(id)?; | ||||
|                     debug_assert!(self.streams().is_active(id)); | ||||
|                     self.streams_mut().close_recv_half(id)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|   | ||||
| @@ -42,7 +42,7 @@ impl<T, U> StreamRecvOpen<T> | ||||
|         let f = frame::Reset::new(id, RefusedStream); | ||||
|         match self.inner.start_send(f.into())? { | ||||
|             AsyncSink::Ready => { | ||||
|                 self.inner.reset_stream(id, RefusedStream); | ||||
|                 self.streams_mut().reset_stream(id, RefusedStream); | ||||
|                 Ok(Async::Ready(())) | ||||
|             } | ||||
|             AsyncSink::NotReady(_) => { | ||||
| @@ -81,7 +81,7 @@ impl<T> ApplySettings for StreamRecvOpen<T> | ||||
| impl<T: ControlStreams> StreamRecvOpen<T> { | ||||
|     fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> { | ||||
|         // Ensure that the stream hasn't been closed otherwise. | ||||
|         match self.inner.get_reset(id) { | ||||
|         match self.streams().get_reset(id) { | ||||
|             Some(reason) => Err(reason.into()), | ||||
|             None => Ok(()), | ||||
|         } | ||||
| @@ -127,20 +127,20 @@ impl<T, U> Stream for StreamRecvOpen<T> | ||||
|                 &Frame::Headers(..) => { | ||||
|                     self.check_not_reset(id)?; | ||||
|  | ||||
|                     if T::remote_valid_id(id) { | ||||
|                         if self.inner.is_remote_active(id) { | ||||
|                     if self.streams().is_valid_remote_stream_id(id) { | ||||
|                         if self.streams().is_remote_active(id) { | ||||
|                             // Can't send a a HEADERS frame on a remote stream that's | ||||
|                             // active, because we've already received headers.  This will | ||||
|                             // have to change to support PUSH_PROMISE. | ||||
|                             return Err(ProtocolError.into()); | ||||
|                         } | ||||
|  | ||||
|                         if !T::remote_can_open() { | ||||
|                         if !self.streams().can_remote_open() { | ||||
|                             return Err(ProtocolError.into()); | ||||
|                         } | ||||
|  | ||||
|                         if let Some(max) = self.max_concurrency { | ||||
|                             if (max as usize) < self.inner.remote_active_len() { | ||||
|                             if (max as usize) < self.streams().remote_active_len() { | ||||
|                                 debug!("refusing stream that would exceed max_concurrency={}", max); | ||||
|                                 self.send_refuse(id)?; | ||||
|  | ||||
| @@ -149,17 +149,17 @@ impl<T, U> Stream for StreamRecvOpen<T> | ||||
|                             } | ||||
|                         } | ||||
|  | ||||
|                         self.inner.remote_open(id, self.initial_window_size)?; | ||||
|                         self.inner.streams_mut().remote_open(id, self.initial_window_size)?; | ||||
|                     } else { | ||||
|                         // On remote streams, | ||||
|                         self.inner.local_open_recv_half(id, self.initial_window_size)?; | ||||
|                         self.inner.streams_mut().local_open_recv_half(id, self.initial_window_size)?; | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 // All other stream frames are sent only when | ||||
|                 _ => { | ||||
|                     self.check_not_reset(id)?; | ||||
|                     if !self.inner.is_recv_open(id) { | ||||
|                     if !self.streams().is_recv_open(id) { | ||||
|                         return Err(ProtocolError.into()); | ||||
|                     } | ||||
|                 } | ||||
|   | ||||
| @@ -1,5 +1,4 @@ | ||||
| use ConnectionError; | ||||
| use error::Reason; | ||||
| use frame::{self, Frame}; | ||||
| use proto::*; | ||||
|  | ||||
| @@ -34,10 +33,10 @@ impl<T, U> Sink for StreamSendClose<T> | ||||
|         if !id.is_zero() { | ||||
|             if eos { | ||||
|                 if let &Frame::Reset(ref rst) = &frame { | ||||
|                     self.inner.reset_stream(id, rst.reason()); | ||||
|                     self.streams_mut().reset_stream(id, rst.reason()); | ||||
|                 } else { | ||||
|                     debug_assert!(self.inner.is_active(id)); | ||||
|                     self.inner.close_send_half(id)?; | ||||
|                     debug_assert!(self.streams().is_active(id)); | ||||
|                     self.streams_mut().close_send_half(id)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|   | ||||
| @@ -49,7 +49,7 @@ impl<T: ApplySettings> ApplySettings for StreamSendOpen<T> { | ||||
| impl<T: ControlStreams> StreamSendOpen<T> { | ||||
|     fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> { | ||||
|         // Ensure that the stream hasn't been closed otherwise. | ||||
|         match self.inner.get_reset(id) { | ||||
|         match self.streams().get_reset(id) { | ||||
|             Some(reason) => Err(StreamReset(reason).into()), | ||||
|             None => Ok(()), | ||||
|         } | ||||
| @@ -82,15 +82,15 @@ impl<T, U> Sink for StreamSendOpen<T> | ||||
|  | ||||
|             &Frame::Headers(..) => { | ||||
|                 self.check_not_reset(id)?; | ||||
|                 if T::local_valid_id(id) { | ||||
|                     if self.inner.is_local_active(id) { | ||||
|                 if self.streams().is_valid_local_stream_id(id) { | ||||
|                     if self.streams().is_local_active(id) { | ||||
|                         // Can't send a a HEADERS frame on a local stream that's active, | ||||
|                         // because we've already sent headers.  This will have to change | ||||
|                         // to support PUSH_PROMISE. | ||||
|                         return Err(UnexpectedFrameType.into()); | ||||
|                     } | ||||
|  | ||||
|                     if !T::local_can_open() { | ||||
|                     if !self.streams().can_local_open() { | ||||
|                         // A server tried to start a stream with a HEADERS frame. | ||||
|                         return Err(UnexpectedFrameType.into()); | ||||
|                     } | ||||
| @@ -98,15 +98,15 @@ impl<T, U> Sink for StreamSendOpen<T> | ||||
|                     if let Some(max) = self.max_concurrency { | ||||
|                         // Don't allow this stream to overflow the remote's max stream | ||||
|                         // concurrency. | ||||
|                         if (max as usize) < self.inner.local_active_len() { | ||||
|                         if (max as usize) < self.streams().local_active_len() { | ||||
|                             return Err(Rejected.into()); | ||||
|                         } | ||||
|                     } | ||||
|  | ||||
|                     self.inner.local_open(id, self.initial_window_size)?; | ||||
|                     self.inner.streams_mut().local_open(id, self.initial_window_size)?; | ||||
|                 } else { | ||||
|                     // On remote streams, | ||||
|                     if self.inner.remote_open_send_half(id, self.initial_window_size).is_err() { | ||||
|                     if self.inner.streams_mut().remote_open_send_half(id, self.initial_window_size).is_err() { | ||||
|                         return Err(InvalidStreamId.into()); | ||||
|                     } | ||||
|                 } | ||||
| @@ -116,7 +116,7 @@ impl<T, U> Sink for StreamSendOpen<T> | ||||
|             // the stream is open (i.e. has already sent headers). | ||||
|             _ => { | ||||
|                 self.check_not_reset(id)?; | ||||
|                 if !self.inner.is_send_open(id) { | ||||
|                 if !self.streams().is_send_open(id) { | ||||
|                     return Err(InactiveStreamId.into()); | ||||
|                 } | ||||
|             } | ||||
|   | ||||
| @@ -6,14 +6,20 @@ use proto::stream_state::StreamState; | ||||
| use fnv::FnvHasher; | ||||
| use ordermap::OrderMap; | ||||
| use std::hash::BuildHasherDefault; | ||||
| use std::marker::PhantomData; | ||||
|  | ||||
| /// Holds the underlying stream state to be accessed by upper layers. | ||||
| // TODO track reserved streams | ||||
| // TODO constrain the size of `reset` | ||||
| #[derive(Debug, Default)] | ||||
| pub struct StreamStates<T, P> { | ||||
| #[derive(Debug)] | ||||
| pub struct StreamStates<T> { | ||||
|     inner: T, | ||||
|     streams: Streams, | ||||
| } | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct Streams { | ||||
|     /// True when in the context of an H2 server. | ||||
|     is_server: bool, | ||||
|  | ||||
|     /// Holds active streams initiated by the local endpoint. | ||||
|     local_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>, | ||||
| @@ -23,30 +29,56 @@ pub struct StreamStates<T, P> { | ||||
|  | ||||
|     /// Holds active streams initiated by the remote. | ||||
|     reset: OrderMap<StreamId, Reason, BuildHasherDefault<FnvHasher>>, | ||||
|  | ||||
|     _phantom: PhantomData<P>, | ||||
| } | ||||
|  | ||||
| impl<T, P, U> StreamStates<T, P> | ||||
| impl<T, U> StreamStates<T> | ||||
|     where T: Stream<Item = Frame, Error = ConnectionError>, | ||||
|           T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           P: Peer, | ||||
| { | ||||
|     pub fn new(inner: T) -> StreamStates<T, P> { | ||||
|     pub fn new<P: Peer>(inner: T) -> StreamStates<T> { | ||||
|         StreamStates { | ||||
|             inner, | ||||
|             local_active: OrderMap::default(), | ||||
|             remote_active: OrderMap::default(), | ||||
|             reset: OrderMap::default(), | ||||
|             _phantom: PhantomData, | ||||
|             streams: Streams { | ||||
|                 is_server: P::is_server(), | ||||
|                 local_active: OrderMap::default(), | ||||
|                 remote_active: OrderMap::default(), | ||||
|                 reset: OrderMap::default(), | ||||
|             }, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, P: Peer> StreamStates<T, P> { | ||||
|     pub fn get_active(&mut self, id: StreamId) -> Option<&StreamState> { | ||||
| impl<T> ControlStreams for StreamStates<T> { | ||||
|     fn streams(&self) -> &Streams { | ||||
|         &self.streams | ||||
|     } | ||||
|  | ||||
|     fn streams_mut(&mut self) -> &mut Streams { | ||||
|         &mut self.streams | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Streams { | ||||
|     pub fn is_valid_local_stream_id(&self, id: StreamId) -> bool { | ||||
|         if self.is_server { | ||||
|             id.is_server_initiated() | ||||
|         } else { | ||||
|             id.is_client_initiated() | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn is_valid_remote_stream_id(&self, id: StreamId) -> bool { | ||||
|         if self.is_server { | ||||
|             id.is_client_initiated() | ||||
|         } else { | ||||
|             id.is_server_initiated() | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn get_active(&self, id: StreamId) -> Option<&StreamState> { | ||||
|         assert!(!id.is_zero()); | ||||
|         if P::is_valid_local_stream_id(id) { | ||||
|  | ||||
|         if self.is_valid_local_stream_id(id) { | ||||
|             self.local_active.get(&id) | ||||
|         } else { | ||||
|             self.remote_active.get(&id) | ||||
| @@ -55,7 +87,8 @@ impl<T, P: Peer> StreamStates<T, P> { | ||||
|  | ||||
|     pub fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { | ||||
|         assert!(!id.is_zero()); | ||||
|         if P::is_valid_local_stream_id(id) { | ||||
|  | ||||
|         if self.is_valid_local_stream_id(id) { | ||||
|             self.local_active.get_mut(&id) | ||||
|         } else { | ||||
|             self.remote_active.get_mut(&id) | ||||
| @@ -64,31 +97,27 @@ impl<T, P: Peer> StreamStates<T, P> { | ||||
|  | ||||
|     pub fn remove_active(&mut self, id: StreamId) { | ||||
|         assert!(!id.is_zero()); | ||||
|         if P::is_valid_local_stream_id(id) { | ||||
|  | ||||
|         if self.is_valid_local_stream_id(id) { | ||||
|             self.local_active.remove(&id); | ||||
|         } else { | ||||
|             self.remote_active.remove(&id); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, P: Peer> ControlStreams for StreamStates<T, P> { | ||||
|     fn local_valid_id(id: StreamId) -> bool { | ||||
|         P::is_valid_local_stream_id(id) | ||||
|     pub fn can_local_open(&self) -> bool { | ||||
|         !self.is_server | ||||
|     } | ||||
|  | ||||
|     fn remote_valid_id(id: StreamId) -> bool { | ||||
|         P::is_valid_remote_stream_id(id) | ||||
|     pub fn can_remote_open(&self) -> bool { | ||||
|         !self.can_local_open() | ||||
|     } | ||||
|  | ||||
|     fn local_can_open() -> bool { | ||||
|         P::local_can_open() | ||||
|     } | ||||
|  | ||||
|     fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { | ||||
|         if !Self::local_valid_id(id) || !Self::local_can_open() { | ||||
|     pub fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { | ||||
|         if !self.is_valid_local_stream_id(id) || !self.can_local_open() { | ||||
|             return Err(ProtocolError.into()); | ||||
|         } | ||||
|  | ||||
|         if self.local_active.contains_key(&id) { | ||||
|             return Err(ProtocolError.into()); | ||||
|         } | ||||
| @@ -97,8 +126,8 @@ impl<T, P: Peer> ControlStreams for StreamStates<T, P> { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { | ||||
|         if !Self::remote_valid_id(id) || !Self::remote_can_open() { | ||||
|     pub fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { | ||||
|         if !self.is_valid_remote_stream_id(id) || !self.can_remote_open() { | ||||
|             return Err(ProtocolError.into()); | ||||
|         } | ||||
|         if self.remote_active.contains_key(&id) { | ||||
| @@ -109,8 +138,8 @@ impl<T, P: Peer> ControlStreams for StreamStates<T, P> { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { | ||||
|         if !Self::local_valid_id(id) { | ||||
|     pub fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { | ||||
|         if !self.is_valid_local_stream_id(id) { | ||||
|             return Err(ProtocolError.into()); | ||||
|         } | ||||
|  | ||||
| @@ -120,8 +149,8 @@ impl<T, P: Peer> ControlStreams for StreamStates<T, P> { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { | ||||
|         if !Self::remote_valid_id(id) { | ||||
|     pub fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { | ||||
|         if !self.is_valid_remote_stream_id(id) { | ||||
|             return Err(ProtocolError.into()); | ||||
|         } | ||||
|  | ||||
| @@ -131,7 +160,7 @@ impl<T, P: Peer> ControlStreams for StreamStates<T, P> { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { | ||||
|     pub fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { | ||||
|         let fully_closed = self.get_active_mut(id) | ||||
|             .map(|s| s.close_send_half()) | ||||
|             .unwrap_or_else(|| Err(ProtocolError.into()))?; | ||||
| @@ -143,7 +172,7 @@ impl<T, P: Peer> ControlStreams for StreamStates<T, P> { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { | ||||
|     pub fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { | ||||
|         let fully_closed = self.get_active_mut(id) | ||||
|             .map(|s| s.close_recv_half()) | ||||
|             .unwrap_or_else(|| Err(ProtocolError.into()))?; | ||||
| @@ -155,46 +184,55 @@ impl<T, P: Peer> ControlStreams for StreamStates<T, P> { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn reset_stream(&mut self, id: StreamId, cause: Reason) { | ||||
|     pub fn reset_stream(&mut self, id: StreamId, cause: Reason) { | ||||
|         self.remove_active(id); | ||||
|         self.reset.insert(id, cause); | ||||
|     } | ||||
|  | ||||
|     fn get_reset(&self, id: StreamId) -> Option<Reason> { | ||||
|     pub fn get_reset(&self, id: StreamId) -> Option<Reason> { | ||||
|         self.reset.get(&id).map(|r| *r) | ||||
|     } | ||||
|  | ||||
|     fn is_local_active(&self, id: StreamId) -> bool { | ||||
|     pub fn is_local_active(&self, id: StreamId) -> bool { | ||||
|         self.local_active.contains_key(&id) | ||||
|     } | ||||
|  | ||||
|     fn is_remote_active(&self, id: StreamId) -> bool { | ||||
|     pub fn is_remote_active(&self, id: StreamId) -> bool { | ||||
|         self.remote_active.contains_key(&id) | ||||
|     } | ||||
|  | ||||
|     fn is_send_open(&mut self, id: StreamId) -> bool { | ||||
|     /// Returns true if the given stream was opened and is not yet closed. | ||||
|     pub fn is_active(&self, id: StreamId) -> bool { | ||||
|         if self.is_valid_local_stream_id(id) { | ||||
|             self.is_local_active(id) | ||||
|         } else { | ||||
|             self.is_remote_active(id) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn is_send_open(&self, id: StreamId) -> bool { | ||||
|         match self.get_active(id) { | ||||
|             Some(s) => s.is_send_open(), | ||||
|             None => false, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn is_recv_open(&mut self, id: StreamId) -> bool  { | ||||
|     pub fn is_recv_open(&self, id: StreamId) -> bool  { | ||||
|         match self.get_active(id) { | ||||
|             Some(s) => s.is_recv_open(), | ||||
|             None => false, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn local_active_len(&self) -> usize { | ||||
|     pub fn local_active_len(&self) -> usize { | ||||
|         self.local_active.len() | ||||
|     } | ||||
|  | ||||
|     fn remote_active_len(&self) -> usize { | ||||
|     pub fn remote_active_len(&self) -> usize { | ||||
|         self.remote_active.len() | ||||
|     } | ||||
|  | ||||
|     fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { | ||||
|     pub fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { | ||||
|         if new_sz < old_sz { | ||||
|             let decr = old_sz - new_sz; | ||||
|  | ||||
| @@ -226,7 +264,7 @@ impl<T, P: Peer> ControlStreams for StreamStates<T, P> { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { | ||||
|     pub fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { | ||||
|         if new_sz < old_sz { | ||||
|             let decr = old_sz - new_sz; | ||||
|  | ||||
| @@ -258,20 +296,21 @@ impl<T, P: Peer> ControlStreams for StreamStates<T, P> { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { | ||||
|     pub fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { | ||||
|         // TODO: Abstract getting the state for a stream ID | ||||
|         if id.is_zero() { | ||||
|             None | ||||
|         } else if P::is_valid_local_stream_id(id) { | ||||
|         } else if self.is_valid_local_stream_id(id) { | ||||
|             self.local_active.get_mut(&id).and_then(|s| s.recv_flow_controller()) | ||||
|         } else { | ||||
|             self.remote_active.get_mut(&id).and_then(|s| s.recv_flow_controller()) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { | ||||
|     pub fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { | ||||
|         if id.is_zero() { | ||||
|             None | ||||
|         } else if P::is_valid_local_stream_id(id) { | ||||
|         } else if self.is_valid_local_stream_id(id) { | ||||
|             self.local_active.get_mut(&id).and_then(|s| s.send_flow_controller()) | ||||
|         } else { | ||||
|             self.remote_active.get_mut(&id).and_then(|s| s.send_flow_controller()) | ||||
| @@ -279,8 +318,8 @@ impl<T, P: Peer> ControlStreams for StreamStates<T, P> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| proxy_apply_settings!(StreamStates, P); | ||||
| proxy_control_ping!(StreamStates, P); | ||||
| proxy_stream!(StreamStates, P); | ||||
| proxy_sink!(StreamStates, P); | ||||
| proxy_ready_sink!(StreamStates, P); | ||||
| proxy_apply_settings!(StreamStates); | ||||
| proxy_control_ping!(StreamStates); | ||||
| proxy_stream!(StreamStates); | ||||
| proxy_sink!(StreamStates); | ||||
| proxy_ready_sink!(StreamStates); | ||||
|   | ||||
| @@ -111,16 +111,8 @@ impl Peer for Server { | ||||
|     type Send = http::response::Head; | ||||
|     type Poll = http::request::Head; | ||||
|  | ||||
|     fn is_valid_local_stream_id(id: StreamId) -> bool { | ||||
|         id.is_server_initiated() | ||||
|     } | ||||
|  | ||||
|     fn is_valid_remote_stream_id(id: StreamId) -> bool { | ||||
|         id.is_client_initiated() | ||||
|     } | ||||
|  | ||||
|     fn local_can_open() -> bool { | ||||
|         false | ||||
|     fn is_server() -> bool { | ||||
|         true | ||||
|     } | ||||
|  | ||||
|     fn convert_send_message( | ||||
|   | ||||
		Reference in New Issue
	
	Block a user