resets too
This commit is contained in:
		| @@ -2,25 +2,24 @@ use {ConnectionError, Frame}; | ||||
| use client::Client; | ||||
| use error; | ||||
| use frame::{self, SettingSet, StreamId}; | ||||
| use proto::{self, ControlFlow, ControlPing, ControlSettings, Peer, PingPayload, ReadySink, WindowSize}; | ||||
| use proto::*; | ||||
| use server::Server; | ||||
|  | ||||
| use bytes::{Bytes, IntoBuf}; | ||||
| use http::{request, response}; | ||||
| use futures::*; | ||||
| use tokio_io::{AsyncRead, AsyncWrite}; | ||||
| use std::marker::PhantomData; | ||||
|  | ||||
| /// An H2 connection | ||||
| #[derive(Debug)] | ||||
| pub struct Connection<T, P, B: IntoBuf = Bytes> { | ||||
|     inner: proto::Transport<T, P, B::Buf>, | ||||
|     inner: Transport<T, P, B::Buf>, | ||||
|     // Set to `true` as long as the connection is in a valid state. | ||||
|     active: bool, | ||||
|     _phantom: PhantomData<(P, B)>, | ||||
| } | ||||
|  | ||||
| pub fn new<T, P, B>(transport: proto::Transport<T, P, B::Buf>) | ||||
| pub fn new<T, P, B>(transport: Transport<T, P, B::Buf>) | ||||
|     -> Connection<T, P, B> | ||||
|     where T: AsyncRead + AsyncWrite, | ||||
|           P: Peer, | ||||
| @@ -178,6 +177,11 @@ impl<T, P, B> Stream for Connection<T, P, B> | ||||
|                     data: v.into_payload(), | ||||
|                 }, | ||||
|  | ||||
|                 Some(Reset(v)) => Frame::Reset { | ||||
|                     id: v.stream_id(), | ||||
|                     error: v.reason(), | ||||
|                 }, | ||||
|  | ||||
|                 Some(frame) => panic!("unexpected frame; frame={:?}", frame), | ||||
|                 None => return Ok(Async::Ready(None)), | ||||
|             }; | ||||
| @@ -213,6 +217,10 @@ impl<T, P, B> Sink for Connection<T, P, B> | ||||
|  | ||||
|         match item { | ||||
|             Frame::Headers { id, headers, end_of_stream } => { | ||||
|                 if self.inner.stream_is_reset(id) { | ||||
|                     return Err(error::User::StreamReset.into()); | ||||
|                 } | ||||
|  | ||||
|                 // This is a one-way conversion. By checking `poll_ready` first (above), | ||||
|                 // it's already been determined that the inner `Sink` can accept the item. | ||||
|                 // If the item is rejected, then there is a bug. | ||||
| @@ -222,20 +230,28 @@ impl<T, P, B> Sink for Connection<T, P, B> | ||||
|                 Ok(AsyncSink::Ready) | ||||
|             } | ||||
|  | ||||
|             Frame::Data { id, data, end_of_stream, .. } => { | ||||
|             Frame::Data { id, data, end_of_stream } => { | ||||
|                 if self.inner.stream_is_reset(id) { | ||||
|                     return Err(error::User::StreamReset.into()); | ||||
|                 } | ||||
|  | ||||
|                 let frame = frame::Data::from_buf(id, data.into_buf(), end_of_stream); | ||||
|                 let res = try!(self.inner.start_send(frame.into())); | ||||
|                 assert!(res.is_ready()); | ||||
|                 Ok(AsyncSink::Ready) | ||||
|             } | ||||
|  | ||||
|             Frame::Reset { id, error } => { | ||||
|                 let f = frame::Reset::new(id, error); | ||||
|                 let res = self.inner.start_send(f.into())?; | ||||
|                 assert!(res.is_ready()); | ||||
|                 Ok(AsyncSink::Ready) | ||||
|             } | ||||
|  | ||||
|             /* | ||||
|             Frame::Trailers { id, headers } => { | ||||
|                 unimplemented!(); | ||||
|             } | ||||
|             Frame::Body { id, chunk, end_of_stream } => { | ||||
|                 unimplemented!(); | ||||
|             } | ||||
|             Frame::PushPromise { id, promise } => { | ||||
|                 unimplemented!(); | ||||
|             } | ||||
|   | ||||
| @@ -76,26 +76,33 @@ impl<T: ControlStreams> FlowControl<T> { | ||||
|  | ||||
| /// Proxies access to streams. | ||||
| impl<T: ControlStreams> ControlStreams for FlowControl<T> { | ||||
|     #[inline] | ||||
|     fn streams(&self) -> &StreamMap { | ||||
|         self.inner.streams() | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     fn streams_mut(&mut self) -> &mut StreamMap { | ||||
|         self.inner.streams_mut() | ||||
|     } | ||||
|  | ||||
|     fn stream_is_reset(&self, id: StreamId) -> bool { | ||||
|         self.inner.stream_is_reset(id) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Exposes a public upward API for flow control. | ||||
| impl<T: ControlStreams> ControlFlow for FlowControl<T> { | ||||
|     fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> { | ||||
|         if let Some(mut flow) = self.remote_flow_controller(id) { | ||||
|             if let Some(sz) = flow.apply_window_update() { | ||||
|                 return Ok(Async::Ready(sz)); | ||||
|         if self.stream_is_reset(id) { | ||||
|             return Err(error::User::StreamReset.into()); | ||||
|         } | ||||
|  | ||||
|         match self.remote_flow_controller(id) { | ||||
|             None => return Err(error::User::InvalidStreamId.into()), | ||||
|             Some(mut flow) => { | ||||
|                 if let Some(sz) = flow.apply_window_update() { | ||||
|                     return Ok(Async::Ready(sz)); | ||||
|                 } | ||||
|             } | ||||
|         } else { | ||||
|             return Err(error::User::InvalidStreamId.into()); | ||||
|         } | ||||
|  | ||||
|         self.blocked_remote_window_update = Some(task::current()); | ||||
| @@ -103,18 +110,26 @@ impl<T: ControlStreams> ControlFlow for FlowControl<T> { | ||||
|     } | ||||
|  | ||||
|     fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { | ||||
|         if let Some(mut fc) = self.local_flow_controller(id) { | ||||
|             fc.expand_window(incr); | ||||
|         } else { | ||||
|             return Err(error::User::InvalidStreamId.into()); | ||||
|         } | ||||
|         let added = match self.local_flow_controller(id) { | ||||
|             None => false, | ||||
|             Some(mut fc) => { | ||||
|                 fc.expand_window(incr); | ||||
|                 true | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         if id.is_zero() { | ||||
|             self.pending_local_connection_window_update = true; | ||||
|         if added {   | ||||
|             if id.is_zero() { | ||||
|                 self.pending_local_connection_window_update = true; | ||||
|             } else { | ||||
|                 self.pending_local_window_updates.push_back(id); | ||||
|             } | ||||
|             Ok(()) | ||||
|         } else if self.stream_is_reset(id) { | ||||
|             Err(error::User::StreamReset.into()) | ||||
|         } else { | ||||
|             self.pending_local_window_updates.push_back(id); | ||||
|             Err(error::User::InvalidStreamId.into()) | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -145,9 +160,11 @@ impl<T, U> FlowControl<T> | ||||
|         } | ||||
|  | ||||
|         while let Some(id) = self.pending_local_window_updates.pop_front() { | ||||
|             let update = self.local_flow_controller(id).and_then(|s| s.apply_window_update()); | ||||
|             if let Some(incr) = update { | ||||
|                 try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); | ||||
|             if !self.stream_is_reset(id) { | ||||
|                 let update = self.local_flow_controller(id).and_then(|s| s.apply_window_update()); | ||||
|                 if let Some(incr) = update { | ||||
|                     try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|   | ||||
| @@ -80,12 +80,12 @@ impl<T> FramedRead<T> { | ||||
|                 frame.into() | ||||
|             } | ||||
|  | ||||
|             Kind::Reset => { | ||||
|                 frame::Reset::load(head, &bytes[frame::HEADER_LEN..])?.into() | ||||
|             } | ||||
|  | ||||
|             // TODO | ||||
|  | ||||
|             Kind::Reset => { | ||||
|                 let _todo = try!(frame::Reset::load(head, &bytes[frame::HEADER_LEN..])); | ||||
|                 unimplemented!(); | ||||
|             } | ||||
|             Kind::GoAway => { | ||||
|                 let _todo = try!(frame::GoAway::load(&bytes[frame::HEADER_LEN..])); | ||||
|                 unimplemented!(); | ||||
|   | ||||
| @@ -145,6 +145,10 @@ impl<T, B> Sink for FramedWrite<T, B> | ||||
|                 v.encode(self.buf.get_mut()); | ||||
|                 trace!("encoded window_update; rem={:?}", self.buf.remaining()); | ||||
|             } | ||||
|             Frame::Reset(v) => { | ||||
|                 v.encode(self.buf.get_mut()); | ||||
|                 trace!("encoded reset; rem={:?}", self.buf.remaining()); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(AsyncSink::Ready) | ||||
|   | ||||
| @@ -128,6 +128,7 @@ pub trait ControlFlow { | ||||
| pub trait ControlStreams { | ||||
|     fn streams(&self)-> &StreamMap; | ||||
|     fn streams_mut(&mut self) -> &mut StreamMap; | ||||
|     fn stream_is_reset(&self, id: StreamId) -> bool; | ||||
| } | ||||
|  | ||||
| pub type PingPayload = [u8; 8]; | ||||
|   | ||||
| @@ -103,6 +103,10 @@ impl<T: ControlStreams> ControlStreams for Settings<T> { | ||||
|     fn streams_mut(&mut self) -> &mut StreamMap { | ||||
|         self.inner.streams_mut() | ||||
|     } | ||||
|  | ||||
|     fn stream_is_reset(&self, id: StreamId) -> bool { | ||||
|         self.inner.stream_is_reset(id) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: ControlFlow> ControlFlow for Settings<T> { | ||||
|   | ||||
| @@ -64,6 +64,14 @@ pub enum StreamState { | ||||
| } | ||||
|  | ||||
| impl StreamState { | ||||
|     pub fn is_closed(&self) ->  bool { | ||||
|         use self::StreamState::*; | ||||
|  | ||||
|         match self { | ||||
|             &Closed => true, | ||||
|             _ => false, | ||||
|         } | ||||
|     } | ||||
|     /// Transition the state to represent headers being received. | ||||
|     /// | ||||
|     /// Returns true if this state transition results in iniitializing the | ||||
| @@ -296,6 +304,10 @@ impl StreamMap { | ||||
|         self.inner.entry(id) | ||||
|     } | ||||
|  | ||||
|     pub fn remove(&mut self, id: &StreamId) -> Option<StreamState> { | ||||
|         self.inner.remove(id) | ||||
|     } | ||||
|  | ||||
|     pub fn shrink_all_local_windows(&mut self, decr: u32) { | ||||
|         for (_, mut s) in &mut self.inner { | ||||
|             if let Some(fc) = s.local_flow_controller() { | ||||
|   | ||||
| @@ -1,16 +1,21 @@ | ||||
| use ConnectionError; | ||||
| use {ConnectionError}; | ||||
| use error::Reason; | ||||
| use error::User; | ||||
| use frame::{self, Frame}; | ||||
| use proto::*; | ||||
|  | ||||
| use fnv::FnvHasher; | ||||
| use ordermap::OrderMap; | ||||
| use std::hash::BuildHasherDefault; | ||||
| use std::marker::PhantomData; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct StreamTracker<T, P> { | ||||
|     inner: T, | ||||
|     peer: PhantomData<P>, | ||||
|     streams: StreamMap, | ||||
|     active_streams: StreamMap, | ||||
|     // TODO reserved_streams: HashSet<StreamId> | ||||
|     reset_streams: OrderMap<StreamId, Reason, BuildHasherDefault<FnvHasher>>, | ||||
|     local_max_concurrency: Option<u32>, | ||||
|     remote_max_concurrency: Option<u32>, | ||||
|     initial_local_window_size: WindowSize, | ||||
| @@ -32,7 +37,8 @@ impl<T, P, U> StreamTracker<T, P> | ||||
|         StreamTracker { | ||||
|             inner, | ||||
|             peer: PhantomData, | ||||
|             streams: StreamMap::default(), | ||||
|             active_streams: StreamMap::default(), | ||||
|             reset_streams: OrderMap::default(), | ||||
|             local_max_concurrency, | ||||
|             remote_max_concurrency, | ||||
|             initial_local_window_size, | ||||
| @@ -44,12 +50,16 @@ impl<T, P, U> StreamTracker<T, P> | ||||
| impl<T, P> ControlStreams for StreamTracker<T, P> { | ||||
|     #[inline] | ||||
|     fn streams(&self) -> &StreamMap { | ||||
|         &self.streams | ||||
|         &self.active_streams | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     fn streams_mut(&mut self) -> &mut StreamMap { | ||||
|         &mut self.streams | ||||
|         &mut self.active_streams | ||||
|     } | ||||
|  | ||||
|     fn stream_is_reset(&self, id: StreamId) -> bool { | ||||
|         self.reset_streams.contains_key(&id) | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -109,40 +119,93 @@ impl<T, P> Stream for StreamTracker<T, P> | ||||
|     fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> { | ||||
|         use frame::Frame::*; | ||||
|  | ||||
|         match try_ready!(self.inner.poll()) { | ||||
|             Some(Headers(v)) => { | ||||
|                 let id = v.stream_id(); | ||||
|                 let eos = v.is_end_stream(); | ||||
|         loop { | ||||
|             match try_ready!(self.inner.poll()) { | ||||
|                 Some(Headers(v)) => { | ||||
|                     let id = v.stream_id(); | ||||
|                     let eos = v.is_end_stream(); | ||||
|  | ||||
|                 let initialized = self.streams | ||||
|                     .entry(id) | ||||
|                     .or_insert_with(|| StreamState::default()) | ||||
|                     .recv_headers::<P>(eos, self.initial_local_window_size)?; | ||||
|                     if self.reset_streams.contains_key(&id) { | ||||
|                         continue; | ||||
|                     } | ||||
|  | ||||
|                 if initialized { | ||||
|                     // TODO: Ensure available capacity for a new stream | ||||
|                     // This won't be as simple as self.streams.len() as closed | ||||
|                     // connections should not be factored. | ||||
|                     let is_closed = { | ||||
|                         let stream = self.active_streams.entry(id) | ||||
|                             .or_insert_with(|| StreamState::default()); | ||||
|  | ||||
|                     if !P::is_valid_remote_stream_id(id) { | ||||
|                         return Err(Reason::ProtocolError.into()); | ||||
|                         let initialized = | ||||
|                             stream.recv_headers::<P>(eos, self.initial_local_window_size)?; | ||||
|  | ||||
|                         if initialized { | ||||
|                             // TODO: Ensure available capacity for a new stream | ||||
|                             // This won't be as simple as self.streams.len() as closed | ||||
|                             // connections should not be factored. | ||||
|  | ||||
|                             if !P::is_valid_remote_stream_id(id) { | ||||
|                                 return Err(Reason::ProtocolError.into()); | ||||
|                             } | ||||
|                         } | ||||
|  | ||||
|                         stream.is_closed() | ||||
|                     }; | ||||
|  | ||||
|                     if is_closed { | ||||
|                         self.active_streams.remove(&id); | ||||
|                         self.reset_streams.insert(id, Reason::NoError); | ||||
|                     } | ||||
|  | ||||
|                     return Ok(Async::Ready(Some(Headers(v)))); | ||||
|                 } | ||||
|  | ||||
|                 Some(Data(v)) => { | ||||
|                     let id = v.stream_id(); | ||||
|  | ||||
|                     if self.reset_streams.contains_key(&id) { | ||||
|                         continue; | ||||
|                     } | ||||
|  | ||||
|                     let is_closed = { | ||||
|                         let stream = match self.active_streams.get_mut(&id) { | ||||
|                             None => return Err(Reason::ProtocolError.into()), | ||||
|                             Some(s) => s, | ||||
|                         }; | ||||
|                         stream.recv_data(v.is_end_stream())?; | ||||
|                         stream.is_closed() | ||||
|                     }; | ||||
|  | ||||
|                     if is_closed { | ||||
|                         self.active_streams.remove(&id); | ||||
|                         self.reset_streams.insert(id, Reason::NoError); | ||||
|                     } | ||||
|  | ||||
|                     return Ok(Async::Ready(Some(Data(v)))); | ||||
|                 } | ||||
|  | ||||
|                 Some(Reset(v)) => { | ||||
|                     let id = v.stream_id(); | ||||
|  | ||||
|                     // Set or update the reset reason. | ||||
|                     self.reset_streams.insert(id, v.reason()); | ||||
|  | ||||
|                     if self.active_streams.remove(&id).is_some() { | ||||
|                         return Ok(Async::Ready(Some(Reset(v)))); | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 Ok(Async::Ready(Some(Headers(v)))) | ||||
|             } | ||||
|                 Some(f) => { | ||||
|                     let id = f.stream_id(); | ||||
|  | ||||
|             Some(Data(v)) => { | ||||
|                 match self.streams.get_mut(&v.stream_id()) { | ||||
|                     None => Err(Reason::ProtocolError.into()), | ||||
|                     Some(state) => { | ||||
|                         state.recv_data(v.is_end_stream())?; | ||||
|                         Ok(Async::Ready(Some(Data(v)))) | ||||
|                     if self.reset_streams.contains_key(&id) { | ||||
|                         continue; | ||||
|                     } | ||||
|  | ||||
|                     return Ok(Async::Ready(Some(f))); | ||||
|                 } | ||||
|  | ||||
|                 None => { | ||||
|                     return Ok(Async::Ready(None)); | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             f => Ok(Async::Ready(f)), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -158,6 +221,9 @@ impl<T, P, U> Sink for StreamTracker<T, P> | ||||
|     fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> { | ||||
|         use frame::Frame::*; | ||||
|  | ||||
|         // Must be enforced through higher levels. | ||||
|         debug_assert!(!self.stream_is_reset(item.stream_id())); | ||||
|  | ||||
|         match &item { | ||||
|             &Headers(ref v) => { | ||||
|                 let id = v.stream_id(); | ||||
| @@ -170,34 +236,51 @@ impl<T, P, U> Sink for StreamTracker<T, P> | ||||
|                 // | ||||
|                 // ACTUALLY(ver), maybe not? | ||||
|                 //   https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 | ||||
|                 let initialized = self.streams | ||||
|                     .entry(id) | ||||
|                     .or_insert_with(|| StreamState::default()) | ||||
|                     .send_headers::<P>(eos, self.initial_remote_window_size)?; | ||||
|  | ||||
|                 if initialized { | ||||
|                     // TODO: Ensure available capacity for a new stream | ||||
|                     // This won't be as simple as self.streams.len() as closed | ||||
|                     // connections should not be factored. | ||||
|                     if !P::is_valid_local_stream_id(id) { | ||||
|                         // TODO: clear state | ||||
|                         return Err(User::InvalidStreamId.into()); | ||||
|                 let is_closed = { | ||||
|                     let stream = self.active_streams.entry(id) | ||||
|                         .or_insert_with(|| StreamState::default()); | ||||
|  | ||||
|                     let initialized = | ||||
|                         stream.send_headers::<P>(eos, self.initial_remote_window_size)?; | ||||
|  | ||||
|                     if initialized { | ||||
|                         // TODO: Ensure available capacity for a new stream | ||||
|                         // This won't be as simple as self.streams.len() as closed | ||||
|                         // connections should not be factored. | ||||
|                         if !P::is_valid_local_stream_id(id) { | ||||
|                             // TODO: clear state | ||||
|                             return Err(User::InvalidStreamId.into()); | ||||
|                         } | ||||
|                     } | ||||
|  | ||||
|                     stream.is_closed() | ||||
|                 }; | ||||
|  | ||||
|                 if is_closed { | ||||
|                     self.active_streams.remove(&id); | ||||
|                     self.reset_streams.insert(id, Reason::NoError); | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             &Data(ref v) => { | ||||
|                 match self.streams.get_mut(&v.stream_id()) { | ||||
|                 match self.active_streams.get_mut(&v.stream_id()) { | ||||
|                     None => return Err(User::InactiveStreamId.into()), | ||||
|                     Some(state) => state.send_data(v.is_end_stream())?, | ||||
|                     Some(stream) => stream.send_data(v.is_end_stream())?, | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|  | ||||
|             &Reset(ref v) => { | ||||
|                 let id = v.stream_id(); | ||||
|                 self.active_streams.remove(&id); | ||||
|                 self.reset_streams.insert(id, v.reason()); | ||||
|             } | ||||
|  | ||||
|             _ => {} | ||||
|         } | ||||
|  | ||||
|         self.inner.start_send(item) | ||||
|  | ||||
|     } | ||||
|  | ||||
|     fn poll_complete(&mut self) -> Poll<(), T::SinkError> { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user