A lot of structural work
This commit is contained in:
		
							
								
								
									
										112
									
								
								src/client.rs
									
									
									
									
									
								
							
							
						
						
									
										112
									
								
								src/client.rs
									
									
									
									
									
								
							| @@ -16,13 +16,19 @@ pub struct Handshake<T, B: IntoBuf = Bytes> { | ||||
| } | ||||
|  | ||||
| #[derive(Debug)] | ||||
| struct Peer; | ||||
| pub(crate) struct Peer; | ||||
|  | ||||
| /// Marker type indicating a client peer | ||||
| pub struct Client<T, B: IntoBuf> { | ||||
|     connection: Connection<T, Peer, B>, | ||||
| } | ||||
|  | ||||
| /// Client half of an active HTTP/2.0 stream. | ||||
| pub struct Stream<B: IntoBuf> { | ||||
|     inner: proto::Stream<Peer>, | ||||
|     _p: ::std::marker::PhantomData<B>, | ||||
| } | ||||
|  | ||||
| impl<T> Client<T, Bytes> | ||||
|     where T: AsyncRead + AsyncWrite + 'static, | ||||
| { | ||||
| @@ -67,11 +73,82 @@ impl<T, B> Client<T, B> | ||||
|         Handshake { inner: Box::new(handshake) } | ||||
|     } | ||||
|  | ||||
|     pub fn request(&mut self) { | ||||
|     /// Returns `Ready` when the connection can initialize a new HTTP 2.0 | ||||
|     /// stream. | ||||
|     pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { | ||||
|         unimplemented!(); | ||||
|     } | ||||
|  | ||||
|     /// Send a request on a new HTTP 2.0 stream | ||||
|     pub fn request(&mut self, request: Request<()>, end_of_stream: bool) | ||||
|         -> Result<Stream<B>, ConnectionError> | ||||
|     { | ||||
|         self.connection.send_request(request, end_of_stream) | ||||
|             .map(|stream| Stream { | ||||
|                 inner: stream, | ||||
|                 _p: ::std::marker::PhantomData, | ||||
|             }) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, B> fmt::Debug for Client<T, B> | ||||
|     where T: fmt::Debug, | ||||
|           B: fmt::Debug + IntoBuf, | ||||
|           B::Buf: fmt::Debug + IntoBuf, | ||||
| { | ||||
|     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { | ||||
|         fmt.debug_struct("Client") | ||||
|             .field("connection", &self.connection) | ||||
|             .finish() | ||||
|     } | ||||
| } | ||||
|  | ||||
| // ===== impl Handshake ===== | ||||
|  | ||||
| impl<T, B: IntoBuf> Future for Handshake<T, B> { | ||||
|     type Item = Client<T, B>; | ||||
|     type Error = ConnectionError; | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||
|         self.inner.poll() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, B> fmt::Debug for Handshake<T, B> | ||||
|     where T: fmt::Debug, | ||||
|           B: fmt::Debug + IntoBuf, | ||||
|           B::Buf: fmt::Debug + IntoBuf, | ||||
| { | ||||
|     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { | ||||
|         write!(fmt, "client::Handshake") | ||||
|     } | ||||
| } | ||||
|  | ||||
| // ===== impl Stream ===== | ||||
|  | ||||
| impl<B: IntoBuf> Stream<B> { | ||||
|     /// Receive the HTTP/2.0 response, if it is ready. | ||||
|     pub fn poll_response(&mut self) -> Poll<(), ConnectionError> { | ||||
|         unimplemented!(); | ||||
|     } | ||||
|  | ||||
|     /// Send data | ||||
|     pub fn send_data(&mut self, data: B, end_of_stream: bool) | ||||
|         -> Result<(), ConnectionError> | ||||
|     { | ||||
|         unimplemented!(); | ||||
|     } | ||||
|  | ||||
|     /// Send trailers | ||||
|     pub fn send_trailers(&mut self, trailers: ()) | ||||
|         -> Result<(), ConnectionError> | ||||
|     { | ||||
|         unimplemented!(); | ||||
|     } | ||||
| } | ||||
|  | ||||
| // ===== impl Peer ===== | ||||
|  | ||||
| impl proto::Peer for Peer { | ||||
|     type Send = Request<()>; | ||||
|     type Poll = Response<()>; | ||||
| @@ -109,34 +186,3 @@ impl proto::Peer for Peer { | ||||
|             .map_err(|_| ProtocolError.into()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, B: IntoBuf> Future for Handshake<T, B> { | ||||
|     type Item = Client<T, B>; | ||||
|     type Error = ConnectionError; | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||
|         self.inner.poll() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, B> fmt::Debug for Handshake<T, B> | ||||
|     where T: fmt::Debug, | ||||
|           B: fmt::Debug + IntoBuf, | ||||
|           B::Buf: fmt::Debug + IntoBuf, | ||||
| { | ||||
|     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { | ||||
|         write!(fmt, "client::Handshake") | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, B> fmt::Debug for Client<T, B> | ||||
|     where T: fmt::Debug, | ||||
|           B: fmt::Debug + IntoBuf, | ||||
|           B::Buf: fmt::Debug + IntoBuf, | ||||
| { | ||||
|     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { | ||||
|         fmt.debug_struct("Client") | ||||
|             .field("connection", &self.connection) | ||||
|             .finish() | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -39,6 +39,10 @@ impl StreamId { | ||||
|     pub fn is_zero(&self) -> bool { | ||||
|         self.0 == 0 | ||||
|     } | ||||
|  | ||||
|     pub fn increment(&mut self) { | ||||
|         self.0 += 2; | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl From<u32> for StreamId { | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
| // #![allow(warnings)] | ||||
| #![deny(missing_debug_implementations)] | ||||
| #![allow(warnings)] | ||||
| // #![deny(missing_debug_implementations)] | ||||
|  | ||||
| #[macro_use] | ||||
| extern crate futures; | ||||
|   | ||||
| @@ -1,10 +1,10 @@ | ||||
| use {ConnectionError, Frame}; | ||||
| use {client, ConnectionError, Frame}; | ||||
| use HeaderMap; | ||||
| use frame::{self, StreamId}; | ||||
|  | ||||
| use proto::*; | ||||
|  | ||||
| use http::{request, response}; | ||||
| use http::{Request, Response}; | ||||
| use bytes::{Bytes, IntoBuf}; | ||||
| use tokio_io::{AsyncRead, AsyncWrite}; | ||||
|  | ||||
| @@ -80,6 +80,7 @@ impl<T, P, B> Connection<T, P, B> | ||||
|         unimplemented!(); | ||||
|     } | ||||
|  | ||||
|     /// Returns `Ready` when the connection is ready to receive a frame. | ||||
|     pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { | ||||
|         try_ready!(self.poll_send_ready()); | ||||
|  | ||||
| @@ -89,6 +90,7 @@ impl<T, P, B> Connection<T, P, B> | ||||
|         Ok(().into()) | ||||
|     } | ||||
|  | ||||
|     /* | ||||
|     pub fn send_data(self, | ||||
|                      id: StreamId, | ||||
|                      data: B, | ||||
| @@ -112,10 +114,7 @@ impl<T, P, B> Connection<T, P, B> | ||||
|             headers, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     pub fn start_ping(&mut self, _body: PingPayload) -> StartSend<PingPayload, ConnectionError> { | ||||
|         unimplemented!(); | ||||
|     } | ||||
|     */ | ||||
|  | ||||
|     // ===== Private ===== | ||||
|  | ||||
| @@ -227,6 +226,13 @@ impl<T, P, B> Connection<T, P, B> | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn poll_complete(&mut self) -> Poll<(), ConnectionError> { | ||||
|         try_ready!(self.poll_send_ready()); | ||||
|         try_ready!(self.codec.poll_complete()); | ||||
|  | ||||
|         Ok(().into()) | ||||
|     } | ||||
|  | ||||
|     fn convert_poll_message(frame: frame::Headers) -> Result<Frame<P::Poll>, ConnectionError> { | ||||
|         if frame.is_trailers() { | ||||
|             Ok(Frame::Trailers { | ||||
| @@ -243,6 +249,18 @@ impl<T, P, B> Connection<T, P, B> | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, B> Connection<T, client::Peer, B> | ||||
|     where T: AsyncRead + AsyncWrite, | ||||
|           B: IntoBuf, | ||||
| { | ||||
|     /// Initialize a new HTTP/2.0 stream and send the message. | ||||
|     pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) | ||||
|         -> Result<Stream<client::Peer>, ConnectionError> | ||||
|     { | ||||
|         self.streams.send_request(request, end_of_stream) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /* | ||||
| impl<T, B> Connection<T, Client, B> | ||||
|     where T: AsyncRead + AsyncWrite, | ||||
| @@ -292,20 +310,7 @@ impl<T, B> Connection<T, Server, B> | ||||
| } | ||||
| */ | ||||
|  | ||||
| impl<T, P, B> Stream for Connection<T, P, B> | ||||
|     where T: AsyncRead + AsyncWrite, | ||||
|           P: Peer, | ||||
|           B: IntoBuf, | ||||
| { | ||||
|     type Item = Frame<P::Poll>; | ||||
|     type Error = ConnectionError; | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Option<Self::Item>, ConnectionError> { | ||||
|         // TODO: intercept errors and flag the connection | ||||
|         self.recv_frame() | ||||
|     } | ||||
| } | ||||
|  | ||||
| /* | ||||
| impl<T, P, B> Sink for Connection<T, P, B> | ||||
|     where T: AsyncRead + AsyncWrite, | ||||
|           P: Peer, | ||||
| @@ -379,11 +384,5 @@ impl<T, P, B> Sink for Connection<T, P, B> | ||||
|         // Return success | ||||
|         Ok(AsyncSink::Ready) | ||||
|     } | ||||
|  | ||||
|     fn poll_complete(&mut self) -> Poll<(), ConnectionError> { | ||||
|         try_ready!(self.poll_send_ready()); | ||||
|         try_ready!(self.codec.poll_complete()); | ||||
|  | ||||
|         Ok(().into()) | ||||
|     } | ||||
| } | ||||
| */ | ||||
|   | ||||
| @@ -117,7 +117,7 @@ impl<T: ApplySettings> ApplySettings for FramedRead<T> { | ||||
| } | ||||
| */ | ||||
|  | ||||
| impl<T> Stream for FramedRead<T> | ||||
| impl<T> futures::Stream for FramedRead<T> | ||||
|     where T: AsyncRead, | ||||
| { | ||||
|     type Item = Frame; | ||||
|   | ||||
| @@ -6,18 +6,18 @@ mod settings; | ||||
| mod streams; | ||||
|  | ||||
| pub use self::connection::Connection; | ||||
| pub use self::streams::{Streams, Stream}; | ||||
|  | ||||
| use self::framed_read::FramedRead; | ||||
| use self::framed_write::FramedWrite; | ||||
| use self::ping_pong::PingPong; | ||||
| use self::settings::Settings; | ||||
| use self::streams::Streams; | ||||
|  | ||||
| use {StreamId, ConnectionError}; | ||||
| use error::Reason; | ||||
| use frame::{self, Frame}; | ||||
|  | ||||
| use futures::*; | ||||
| use futures::{self, task, Poll, Async, AsyncSink, Sink, Stream as Stream2}; | ||||
| use bytes::{Buf, IntoBuf}; | ||||
| use tokio_io::{AsyncRead, AsyncWrite}; | ||||
| use tokio_io::codec::length_delimited; | ||||
|   | ||||
| @@ -6,7 +6,7 @@ mod store; | ||||
| mod stream; | ||||
| mod streams; | ||||
|  | ||||
| pub use self::streams::Streams; | ||||
| pub use self::streams::{Streams, Stream}; | ||||
|  | ||||
| use self::flow_control::FlowControl; | ||||
| use self::recv::Recv; | ||||
| @@ -19,6 +19,8 @@ use proto::*; | ||||
| use error::Reason::*; | ||||
| use error::User::*; | ||||
|  | ||||
| use http::{Request, Response}; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct Config { | ||||
|     /// Maximum number of remote initiated streams | ||||
|   | ||||
| @@ -17,6 +17,9 @@ pub struct Send<P> { | ||||
|     /// Current number of locally initiated streams | ||||
|     num_streams: usize, | ||||
|  | ||||
|     /// Stream identifier to use for next initialized stream. | ||||
|     next_stream_id: StreamId, | ||||
|  | ||||
|     /// Initial window size of locally initiated streams | ||||
|     init_window_sz: WindowSize, | ||||
|  | ||||
| @@ -37,9 +40,16 @@ pub struct Send<P> { | ||||
|  | ||||
| impl<P: Peer> Send<P> { | ||||
|     pub fn new(config: &Config) -> Self { | ||||
|         let next_stream_id = if P::is_server() { | ||||
|             2 | ||||
|         } else { | ||||
|             1 | ||||
|         }; | ||||
|  | ||||
|         Send { | ||||
|             max_streams: config.max_local_initiated, | ||||
|             num_streams: 0, | ||||
|             next_stream_id: next_stream_id.into(), | ||||
|             init_window_sz: config.init_local_window_sz, | ||||
|             flow_control: FlowControl::new(config.init_local_window_sz), | ||||
|             pending_window_updates: VecDeque::new(), | ||||
| @@ -51,8 +61,8 @@ impl<P: Peer> Send<P> { | ||||
|     /// Update state reflecting a new, locally opened stream | ||||
|     /// | ||||
|     /// Returns the stream state if successful. `None` if refused | ||||
|     pub fn open(&mut self, id: StreamId) -> Result<State, ConnectionError> { | ||||
|         try!(self.ensure_can_open(id)); | ||||
|     pub fn open(&mut self) -> Result<(StreamId, State), ConnectionError> { | ||||
|         try!(self.ensure_can_open()); | ||||
|  | ||||
|         if let Some(max) = self.max_streams { | ||||
|             if max <= self.num_streams { | ||||
| @@ -60,10 +70,13 @@ impl<P: Peer> Send<P> { | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         let ret = (self.next_stream_id, State::default()); | ||||
|  | ||||
|         // Increment the number of locally initiated streams | ||||
|         self.num_streams += 1; | ||||
|         self.next_stream_id.increment(); | ||||
|  | ||||
|         Ok(State::default()) | ||||
|         Ok(ret) | ||||
|     } | ||||
|  | ||||
|     pub fn send_headers(&mut self, state: &mut State, eos: bool) | ||||
| @@ -191,15 +204,13 @@ impl<P: Peer> Send<P> { | ||||
|     } | ||||
|  | ||||
|     /// Returns true if the local actor can initiate a stream with the given ID. | ||||
|     fn ensure_can_open(&self, id: StreamId) -> Result<(), ConnectionError> { | ||||
|     fn ensure_can_open(&self) -> Result<(), ConnectionError> { | ||||
|         if P::is_server() { | ||||
|             // Servers cannot open streams. PushPromise must first be reserved. | ||||
|             return Err(UnexpectedFrameType.into()); | ||||
|         } | ||||
|  | ||||
|         if !id.is_client_initiated() { | ||||
|             return Err(InvalidStreamId.into()); | ||||
|         } | ||||
|         // TODO: Handle StreamId overflow | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|   | ||||
| @@ -42,6 +42,11 @@ impl Store { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn insert(&mut self, id: StreamId, val: State) { | ||||
|         let handle = self.slab.insert(val); | ||||
|         assert!(self.ids.insert(id, handle).is_none()); | ||||
|     } | ||||
|  | ||||
|     pub fn entry(&mut self, id: StreamId) -> Entry { | ||||
|         use self::hash_map::Entry::*; | ||||
|  | ||||
|   | ||||
| @@ -1,3 +1,4 @@ | ||||
| use client; | ||||
| use proto::*; | ||||
| use super::*; | ||||
|  | ||||
| @@ -10,6 +11,12 @@ pub struct Streams<P> { | ||||
|     inner: Arc<Mutex<Inner<P>>>, | ||||
| } | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct Stream<P> { | ||||
|     inner: Arc<Mutex<Inner<P>>>, | ||||
|     id: StreamId, | ||||
| } | ||||
|  | ||||
| /// Fields needed to manage state related to managing the set of streams. This | ||||
| /// is mostly split out to make ownership happy. | ||||
| /// | ||||
| @@ -139,28 +146,22 @@ impl<P: Peer> Streams<P> { | ||||
|         unimplemented!(); | ||||
|     } | ||||
|  | ||||
|     pub fn send_headers(&mut self, frame: &frame::Headers) | ||||
|     pub fn send_headers(&mut self, headers: frame::Headers) | ||||
|         -> Result<(), ConnectionError> | ||||
|     { | ||||
|         unimplemented!(); | ||||
|         /* | ||||
|         let id = frame.stream_id(); | ||||
|         let mut me = self.inner.lock().unwrap(); | ||||
|         let me = &mut *me; | ||||
|  | ||||
|         trace!("send_headers; id={:?}", id); | ||||
|         // let (id, state) = me.actions.send.open()); | ||||
|  | ||||
|  | ||||
|         let state = match me.store.entry(id) { | ||||
|             Entry::Occupied(e) => e.into_mut(), | ||||
|             Entry::Vacant(e) => { | ||||
|                 // Trailers cannot open a stream. Trailers are header frames | ||||
|                 // that do not contain pseudo headers. Requests MUST contain a | ||||
|                 // method and responses MUST contain a status. If they do not,t | ||||
|                 // hey are considered to be malformed. | ||||
|                 if frame.is_trailers() { | ||||
|                     // TODO: Should this be a different error? | ||||
|                     return Err(UnexpectedFrameType.into()); | ||||
|                 } | ||||
|  | ||||
|                 let state = try!(me.actions.send.open(id)); | ||||
|                 let (id, state) = try!(me.actions.send.open()); | ||||
|                 e.insert(state) | ||||
|             } | ||||
|         }; | ||||
| @@ -176,6 +177,7 @@ impl<P: Peer> Streams<P> { | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|         */ | ||||
|     } | ||||
|  | ||||
|     pub fn send_data<B: Buf>(&mut self, frame: &frame::Data<B>) | ||||
| @@ -250,6 +252,40 @@ impl<P: Peer> Streams<P> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Streams<client::Peer> { | ||||
|     pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) | ||||
|         -> Result<Stream<client::Peer>, ConnectionError> | ||||
|     { | ||||
|         let id = { | ||||
|             let mut me = self.inner.lock().unwrap(); | ||||
|             let me = &mut *me; | ||||
|  | ||||
|             // Initialize a new stream. This fails if the connection is at capacity. | ||||
|             let (id, mut state) = me.actions.send.open()?; | ||||
|  | ||||
|             // Convert the message | ||||
|             let headers = client::Peer::convert_send_message( | ||||
|                 id, request, end_of_stream); | ||||
|  | ||||
|             me.actions.send.send_headers(&mut state, end_of_stream)?; | ||||
|  | ||||
|             // Given that the stream has been initialized, it should not be in the | ||||
|             // closed state. | ||||
|             debug_assert!(!state.is_closed()); | ||||
|  | ||||
|             // Store the state | ||||
|             me.store.insert(id, state); | ||||
|  | ||||
|             id | ||||
|         }; | ||||
|  | ||||
|         Ok(Stream { | ||||
|             inner: self.inner.clone(), | ||||
|             id: id, | ||||
|         }) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<P: Peer> Actions<P> { | ||||
|     fn dec_num_streams(&mut self, id: StreamId) { | ||||
|         if self.is_local_init(id) { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user