Many more changes
This commit is contained in:
		
							
								
								
									
										69
									
								
								src/proto/codec.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								src/proto/codec.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,69 @@ | ||||
| use super::*; | ||||
| use futures::*; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct Codec<T, B> { | ||||
|     inner: FramedRead<FramedWrite<T, B>>, | ||||
| } | ||||
|  | ||||
| impl<T, B> Codec<T, B> { | ||||
|     pub fn apply_remote_settings(&mut self, frame: &frame::Settings) { | ||||
|         self.framed_read().apply_remote_settings(frame); | ||||
|         self.framed_write().apply_remote_settings(frame); | ||||
|     } | ||||
|  | ||||
|     fn framed_read(&mut self) -> &mut FramedRead<FramedWrite<T, B>> { | ||||
|         &mut self.inner | ||||
|     } | ||||
|  | ||||
|     fn framed_write(&mut self) -> &mut FramedWrite<T, B> { | ||||
|         self.inner.get_mut() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, B> Codec<T, B> | ||||
|     where T: AsyncRead + AsyncWrite, | ||||
|           B: Buf, | ||||
| { | ||||
|     pub fn from_framed(inner: FramedRead<FramedWrite<T, B>>) -> Self { | ||||
|         Codec { inner } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, B> Codec<T, B> | ||||
|     where T: AsyncWrite, | ||||
|           B: Buf, | ||||
| { | ||||
|     pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { | ||||
|         self.inner.poll_ready() | ||||
|     } | ||||
|  | ||||
| } | ||||
|  | ||||
| impl<T, B> futures::Stream for Codec<T, B> | ||||
|     where T: AsyncRead, | ||||
| { | ||||
|     type Item = Frame; | ||||
|     type Error = ConnectionError; | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> { | ||||
|         use futures::Stream; | ||||
|         self.inner.poll() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, B> Sink for Codec<T, B> | ||||
|     where T: AsyncWrite, | ||||
|           B: Buf, | ||||
| { | ||||
|     type SinkItem = Frame<B>; | ||||
|     type SinkError = ConnectionError; | ||||
|  | ||||
|     fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { | ||||
|         self.inner.start_send(item) | ||||
|     } | ||||
|  | ||||
|     fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { | ||||
|         self.inner.poll_complete() | ||||
|     } | ||||
| } | ||||
| @@ -47,22 +47,6 @@ impl<T, P, B> Connection<T, P, B> | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn update_local_settings(&mut self, _local: frame::SettingSet) -> Result<(), ConnectionError> { | ||||
|         unimplemented!(); | ||||
|     } | ||||
|  | ||||
|     pub fn remote_initial_window_size(&self) -> u32 { | ||||
|         unimplemented!(); | ||||
|     } | ||||
|  | ||||
|     pub fn remote_max_concurrent_streams(&self) -> Option<usize> { | ||||
|         unimplemented!(); | ||||
|     } | ||||
|  | ||||
|     pub fn remote_push_enabled(&self) -> Option<bool> { | ||||
|         unimplemented!(); | ||||
|     } | ||||
|  | ||||
|     /// Returns `Ready` when the connection is ready to receive a frame. | ||||
|     pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { | ||||
|         try_ready!(self.poll_send_ready()); | ||||
| @@ -125,8 +109,6 @@ impl<T, P, B> Connection<T, P, B> | ||||
|                 Some(Settings(frame)) => { | ||||
|                     trace!("recv SETTINGS; frame={:?}", frame); | ||||
|                     self.settings.recv_settings(frame); | ||||
|  | ||||
|                     // TODO: ACK must be sent THEN settings applied. | ||||
|                 } | ||||
|                 Some(GoAway(frame)) => { | ||||
|                     // TODO: handle the last_stream_id. Also, should this be | ||||
| @@ -160,12 +142,10 @@ impl<T, P, B> Connection<T, P, B> | ||||
|     /// Returns `Ready` when the `Connection` is ready to receive a frame from | ||||
|     /// the socket. | ||||
|     fn poll_recv_ready(&mut self) -> Poll<(), ConnectionError> { | ||||
|         // Pong, settings ack, and stream refusals are high priority frames to | ||||
|         // send. If the write buffer is full, we stop reading any further frames | ||||
|         // until these high priority writes can be committed to the buffer. | ||||
|  | ||||
|         // The order of these calls don't really matter too much as only one | ||||
|         // should have pending work. | ||||
|         try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); | ||||
|         try_ready!(self.settings.send_pending_ack(&mut self.codec)); | ||||
|         try_ready!(self.settings.send_pending_ack(&mut self.codec, &mut self.streams)); | ||||
|         try_ready!(self.streams.send_pending_refusal(&mut self.codec)); | ||||
|  | ||||
|         Ok(().into()) | ||||
|   | ||||
| @@ -48,6 +48,10 @@ impl<T> FramedRead<T> { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn apply_remote_settings(&mut self, _settings: &frame::Settings) { | ||||
|         // TODO: Is this needed? | ||||
|     } | ||||
|  | ||||
|     fn decode_frame(&mut self, mut bytes: BytesMut) -> Result<Option<Frame>, ConnectionError> { | ||||
|         trace!("decoding frame from {}B", bytes.len()); | ||||
|  | ||||
| @@ -142,6 +146,10 @@ impl<T> FramedRead<T> { | ||||
|  | ||||
|         Ok(Some(frame)) | ||||
|     } | ||||
|  | ||||
|     pub fn get_mut(&mut self) -> &mut T { | ||||
|         self.inner.get_mut() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T> futures::Stream for FramedRead<T> | ||||
|   | ||||
| @@ -90,6 +90,14 @@ impl<T, B> FramedWrite<T, B> | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, B> FramedWrite<T, B> { | ||||
|     pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { | ||||
|         if let Some(val) = settings.max_frame_size() { | ||||
|             self.max_frame_size = val; | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, B> Sink for FramedWrite<T, B> | ||||
|     where T: AsyncWrite, | ||||
|           B: Buf, | ||||
|   | ||||
| @@ -1,3 +1,4 @@ | ||||
| mod codec; | ||||
| mod connection; | ||||
| mod framed_read; | ||||
| mod framed_write; | ||||
| @@ -8,6 +9,7 @@ mod streams; | ||||
| pub use self::connection::Connection; | ||||
| pub use self::streams::{Streams, StreamRef, Chunk}; | ||||
|  | ||||
| use self::codec::Codec; | ||||
| use self::framed_read::FramedRead; | ||||
| use self::framed_write::FramedWrite; | ||||
| use self::ping_pong::PingPong; | ||||
| @@ -52,10 +54,6 @@ pub struct WindowUpdate { | ||||
|     increment: WindowSize, | ||||
| } | ||||
|  | ||||
| type Codec<T, B> = | ||||
|     FramedRead< | ||||
|         FramedWrite<T, B>>; | ||||
|  | ||||
| // Constants | ||||
| pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; | ||||
| pub const MAX_WINDOW_SIZE: WindowSize = ::std::u32::MAX; | ||||
| @@ -90,7 +88,7 @@ pub fn from_framed_write<T, P, B>(framed_write: FramedWrite<T, B::Buf>) | ||||
|         .max_frame_length(frame::DEFAULT_MAX_FRAME_SIZE as usize) | ||||
|         .new_read(framed_write); | ||||
|  | ||||
|     let codec = FramedRead::new(framed); | ||||
|     let codec = Codec::from_framed(FramedRead::new(framed)); | ||||
|  | ||||
|     Connection::new(codec) | ||||
| } | ||||
|   | ||||
| @@ -3,13 +3,16 @@ use proto::*; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct Settings { | ||||
|     pending_ack: bool, | ||||
|     /// Received SETTINGS frame pending processing. The ACK must be written to | ||||
|     /// the socket first then the settings applied **before** receiving any | ||||
|     /// further frames. | ||||
|     pending: Option<frame::Settings>, | ||||
| } | ||||
|  | ||||
| impl Settings { | ||||
|     pub fn new() -> Self { | ||||
|         Settings { | ||||
|             pending_ack: false, | ||||
|             pending: None, | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -18,30 +21,31 @@ impl Settings { | ||||
|             debug!("received remote settings ack"); | ||||
|             // TODO: handle acks | ||||
|         } else { | ||||
|             assert!(!self.pending_ack); | ||||
|             self.pending_ack = true; | ||||
|             assert!(self.pending.is_none()); | ||||
|             self.pending = Some(frame); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn send_pending_ack<T, B>(&mut self, dst: &mut Codec<T, B>) | ||||
|     pub fn send_pending_ack<T, B>(&mut self, | ||||
|                                   dst: &mut Codec<T, B>, | ||||
|                                   streams: &mut Streams<B>) | ||||
|         -> Poll<(), ConnectionError> | ||||
|         where T: AsyncWrite, | ||||
|               B: Buf, | ||||
|     { | ||||
|         if self.pending_ack { | ||||
|         if let Some(ref settings) = self.pending { | ||||
|             let frame = frame::Settings::ack(); | ||||
|  | ||||
|             match dst.start_send(frame.into())? { | ||||
|                 AsyncSink::Ready => { | ||||
|                     self.pending_ack = false; | ||||
|                     return Ok(().into()); | ||||
|                 } | ||||
|                 AsyncSink::NotReady(_) => { | ||||
|                     return Ok(Async::NotReady); | ||||
|                 } | ||||
|             if let AsyncSink::NotReady(_) = dst.start_send(frame.into())? { | ||||
|                 return Ok(Async::NotReady); | ||||
|             } | ||||
|  | ||||
|             dst.apply_remote_settings(settings); | ||||
|             streams.apply_remote_settings(settings); | ||||
|         } | ||||
|  | ||||
|         self.pending = None; | ||||
|  | ||||
|         Ok(().into()) | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -48,6 +48,21 @@ impl FlowControl { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Reduce future capacity of the window. | ||||
|     /// | ||||
|     /// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE. | ||||
|     pub fn shrink_window(&mut self, dec: WindowSize) { | ||||
|         /* | ||||
|         if decr < self.next_window_update { | ||||
|             self.next_window_update -= decr | ||||
|         } else { | ||||
|             self.underflow += decr - self.next_window_update; | ||||
|             self.next_window_update = 0; | ||||
|         } | ||||
|         */ | ||||
|     } | ||||
|  | ||||
|  | ||||
|     /// Claims the provided amount from the window, if there is enough space. | ||||
|     /// | ||||
|     /// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than | ||||
|   | ||||
| @@ -221,14 +221,15 @@ impl<B> Recv<B> where B: Buf { | ||||
|  | ||||
|     pub fn recv_push_promise<P: Peer>(&mut self, | ||||
|                                       frame: frame::PushPromise, | ||||
|                                       stream: &mut store::Ptr<B>) | ||||
|                                       stream: store::Key, | ||||
|                                       store: &mut Store<B>) | ||||
|         -> Result<(), ConnectionError> | ||||
|     { | ||||
|         // First, make sure that the values are legit | ||||
|         self.ensure_can_reserve::<P>(frame.promised_id())?; | ||||
|  | ||||
|         // Make sure that the stream state is valid | ||||
|         stream.state.ensure_recv_open()?; | ||||
|         store[stream].state.ensure_recv_open()?; | ||||
|  | ||||
|         // TODO: Streams in the reserved states do not count towards the concurrency | ||||
|         // limit. However, it seems like there should be a cap otherwise this | ||||
| @@ -247,16 +248,18 @@ impl<B> Recv<B> where B: Buf { | ||||
|         let mut new_stream = Stream::new(frame.promised_id()); | ||||
|         new_stream.state.reserve_remote(); | ||||
|  | ||||
|         let mut ppp = stream.pending_push_promises.take(); | ||||
|         let mut ppp = store[stream].pending_push_promises.take(); | ||||
|  | ||||
|         { | ||||
|             // Store the stream | ||||
|             let mut new_stream = stream.store() | ||||
|             let mut new_stream = store | ||||
|                 .insert(frame.promised_id(), new_stream); | ||||
|  | ||||
|             ppp.push::<stream::Next>(&mut new_stream); | ||||
|         } | ||||
|  | ||||
|         let stream = &mut store[stream]; | ||||
|  | ||||
|         stream.pending_push_promises = ppp; | ||||
|         stream.notify_recv(); | ||||
|  | ||||
|   | ||||
| @@ -273,6 +273,63 @@ impl<B> Send<B> where B: Buf { | ||||
|         0 | ||||
|     } | ||||
|  | ||||
|     pub fn apply_remote_settings(&mut self, | ||||
|                                  settings: &frame::Settings, | ||||
|                                  store: &mut Store<B>) { | ||||
|         if let Some(val) = settings.max_concurrent_streams() { | ||||
|             self.max_streams = Some(val as usize); | ||||
|         } | ||||
|  | ||||
|         // Applies an update to the remote endpoint's initial window size. | ||||
|         // | ||||
|         // Per RFC 7540 §6.9.2: | ||||
|         // | ||||
|         // In addition to changing the flow-control window for streams that are | ||||
|         // not yet active, a SETTINGS frame can alter the initial flow-control | ||||
|         // window size for streams with active flow-control windows (that is, | ||||
|         // streams in the "open" or "half-closed (remote)" state). When the | ||||
|         // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust | ||||
|         // the size of all stream flow-control windows that it maintains by the | ||||
|         // difference between the new value and the old value. | ||||
|         // | ||||
|         // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available | ||||
|         // space in a flow-control window to become negative. A sender MUST | ||||
|         // track the negative flow-control window and MUST NOT send new | ||||
|         // flow-controlled frames until it receives WINDOW_UPDATE frames that | ||||
|         // cause the flow-control window to become positive. | ||||
|         if let Some(val) = settings.initial_window_size() { | ||||
|             let old_val = self.init_window_sz; | ||||
|             self.init_window_sz = val; | ||||
|  | ||||
|             if val < old_val { | ||||
|                 let dec = old_val - val; | ||||
|  | ||||
|                 store.for_each(|mut stream| { | ||||
|                     let stream = &mut *stream; | ||||
|  | ||||
|                     if let Some(flow) = stream.state.send_flow_control() { | ||||
|                         flow.shrink_window(val); | ||||
|  | ||||
|                         // Update the unadvertised number as well | ||||
|                         if stream.unadvertised_send_window < dec { | ||||
|                             stream.unadvertised_send_window = 0; | ||||
|                         } else { | ||||
|                             stream.unadvertised_send_window -= dec; | ||||
|                         } | ||||
|                     } | ||||
|                 }); | ||||
|             } else if val > old_val { | ||||
|                 let inc = val - old_val; | ||||
|  | ||||
|                 store.for_each(|mut stream| { | ||||
|                     if let Some(flow) = stream.state.send_flow_control() { | ||||
|                         unimplemented!(); | ||||
|                     } | ||||
|                 }); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), ConnectionError> { | ||||
|         if id >= self.next_stream_id { | ||||
|             return Err(ProtocolError.into()); | ||||
|   | ||||
| @@ -16,7 +16,7 @@ pub(super) struct Store<B> { | ||||
| /// "Pointer" to an entry in the store | ||||
| pub(super) struct Ptr<'a, B: 'a> { | ||||
|     key: Key, | ||||
|     store: &'a mut Store<B>, | ||||
|     slab: &'a mut slab::Slab<Stream<B>>, | ||||
| } | ||||
|  | ||||
| /// References an entry in the store. | ||||
| @@ -72,7 +72,7 @@ impl<B> Store<B> { | ||||
|     pub fn resolve(&mut self, key: Key) -> Ptr<B> { | ||||
|         Ptr { | ||||
|             key: key, | ||||
|             store: self, | ||||
|             slab: &mut self.slab, | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -80,7 +80,7 @@ impl<B> Store<B> { | ||||
|         if let Some(&key) = self.ids.get(id) { | ||||
|             Some(Ptr { | ||||
|                 key: Key(key), | ||||
|                 store: self, | ||||
|                 slab: &mut self.slab, | ||||
|             }) | ||||
|         } else { | ||||
|             None | ||||
| @@ -93,7 +93,7 @@ impl<B> Store<B> { | ||||
|  | ||||
|         Ptr { | ||||
|             key: Key(key), | ||||
|             store: self, | ||||
|             slab: &mut self.slab, | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -117,10 +117,13 @@ impl<B> Store<B> { | ||||
|     } | ||||
|  | ||||
|     pub fn for_each<F>(&mut self, mut f: F) | ||||
|         where F: FnMut(&mut Stream<B>) | ||||
|         where F: FnMut(Ptr<B>) | ||||
|     { | ||||
|         for &id in self.ids.values() { | ||||
|             f(&mut self.slab[id]) | ||||
|         for &key in self.ids.values() { | ||||
|             f(Ptr { | ||||
|                 key: Key(key), | ||||
|                 slab: &mut self.slab, | ||||
|             }); | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -265,19 +268,15 @@ impl<'a, B: 'a> Ptr<'a, B> { | ||||
|         self.key | ||||
|     } | ||||
|  | ||||
|     pub fn store(&mut self) -> &mut Store<B> { | ||||
|         &mut self.store | ||||
|     } | ||||
|  | ||||
|     pub fn resolve(&mut self, key: Key) -> Ptr<B> { | ||||
|         Ptr { | ||||
|             key: key, | ||||
|             store: self.store, | ||||
|             slab: &mut *self.slab, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn into_mut(self) -> &'a mut Stream<B> { | ||||
|         &mut self.store.slab[self.key.0] | ||||
|         &mut self.slab[self.key.0] | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -285,13 +284,13 @@ impl<'a, B: 'a> ops::Deref for Ptr<'a, B> { | ||||
|     type Target = Stream<B>; | ||||
|  | ||||
|     fn deref(&self) -> &Stream<B> { | ||||
|         &self.store.slab[self.key.0] | ||||
|         &self.slab[self.key.0] | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<'a, B: 'a> ops::DerefMut for Ptr<'a, B> { | ||||
|     fn deref_mut(&mut self) -> &mut Stream<B> { | ||||
|         &mut self.store.slab[self.key.0] | ||||
|         &mut self.slab[self.key.0] | ||||
|     } | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -154,7 +154,9 @@ impl<B> Streams<B> | ||||
|         let me = &mut *me; | ||||
|  | ||||
|         let actions = &mut me.actions; | ||||
|         me.store.for_each(|stream| actions.recv.recv_err(err, stream)); | ||||
|         me.store.for_each(|mut stream| { | ||||
|             actions.recv.recv_err(err, &mut *stream) | ||||
|         }); | ||||
|     } | ||||
|  | ||||
|     pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) | ||||
| @@ -189,11 +191,11 @@ impl<B> Streams<B> | ||||
|         let id = frame.stream_id(); | ||||
|  | ||||
|         let mut stream = match me.store.find_mut(&id) { | ||||
|             Some(stream) => stream, | ||||
|             Some(stream) => stream.key(), | ||||
|             None => return Err(ProtocolError.into()), | ||||
|         }; | ||||
|  | ||||
|         me.actions.recv.recv_push_promise::<P>(frame, &mut stream) | ||||
|         me.actions.recv.recv_push_promise::<P>(frame, stream, &mut me.store) | ||||
|     } | ||||
|  | ||||
|     pub fn next_incoming(&mut self) -> Option<StreamRef<B>> { | ||||
| @@ -247,6 +249,13 @@ impl<B> Streams<B> | ||||
|  | ||||
|         me.actions.send.poll_complete(&mut me.store, dst) | ||||
|     } | ||||
|  | ||||
|     pub fn apply_remote_settings(&mut self, frame: &frame::Settings) { | ||||
|         let mut me = self.inner.lock().unwrap(); | ||||
|         let me = &mut *me; | ||||
|  | ||||
|         me.actions.send.apply_remote_settings(frame, &mut me.store); | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<B> Streams<B> | ||||
|   | ||||
		Reference in New Issue
	
	Block a user