WIP
This commit is contained in:
		
							
								
								
									
										86
									
								
								src/proto/connection.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										86
									
								
								src/proto/connection.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,86 @@ | ||||
| use {frame, proto, ConnectionError}; | ||||
| use tokio_io::{AsyncRead, AsyncWrite}; | ||||
| use tokio_io::codec::length_delimited; | ||||
|  | ||||
| use futures::*; | ||||
|  | ||||
| pub struct Connection<T> { | ||||
|     inner: Inner<T>, | ||||
| } | ||||
|  | ||||
| type Inner<T> = | ||||
|     proto::Settings< | ||||
|         proto::PingPong< | ||||
|             proto::FramedWrite< | ||||
|                 proto::FramedRead< | ||||
|                     length_delimited::FramedRead<T>>>>>; | ||||
|  | ||||
| impl<T: AsyncRead + AsyncWrite> Connection<T> { | ||||
|     pub fn new(io: T) -> Connection<T> { | ||||
|         // Delimit the frames | ||||
|         let framed_read = length_delimited::Builder::new() | ||||
|             .big_endian() | ||||
|             .length_field_length(3) | ||||
|             .length_adjustment(6) | ||||
|             .num_skip(0) // Don't skip the header | ||||
|             .new_read(io); | ||||
|  | ||||
|         // Map to `Frame` types | ||||
|         let framed_read = proto::FramedRead::new(framed_read); | ||||
|  | ||||
|         // Frame encoder | ||||
|         let mut framed = proto::FramedWrite::new(framed_read); | ||||
|  | ||||
|         // Ok, so this is a **little** hacky, but it works for now. | ||||
|         // | ||||
|         // The ping/pong behavior SHOULD be given highest priority (6.7). | ||||
|         // However, the connection handshake requires the settings frame to be | ||||
|         // sent as the very first one. This needs special handling because | ||||
|         // otherwise there is a race condition where the peer could send its | ||||
|         // settings frame followed immediately by a Ping, in which case, we | ||||
|         // don't want to accidentally send the pong before finishing the | ||||
|         // connection hand shake. | ||||
|         // | ||||
|         // So, to ensure correct ordering, we write the settings frame here | ||||
|         // before fully constructing the connection struct. Technically, `Async` | ||||
|         // operations should not be performed in `new` because this might not | ||||
|         // happen on a task, however we have full control of the I/O and we know | ||||
|         // that the settings frame will get buffered and not actually perform an | ||||
|         // I/O op. | ||||
|         let initial_settings = frame::SettingSet::default(); | ||||
|         let frame = frame::Settings::new(initial_settings.clone()); | ||||
|         assert!(framed.start_send(frame.into()).unwrap().is_ready()); | ||||
|  | ||||
|         // Add ping/pong handler | ||||
|         let ping_pong = proto::PingPong::new(framed); | ||||
|  | ||||
|         // Add settings handler | ||||
|         let connection = proto::Settings::new(ping_pong, initial_settings); | ||||
|  | ||||
|         Connection { | ||||
|             inner: connection, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: AsyncRead + AsyncWrite> Stream for Connection<T> { | ||||
|     type Item = frame::Frame; | ||||
|     type Error = ConnectionError; | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Option<frame::Frame>, ConnectionError> { | ||||
|         self.inner.poll() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: AsyncRead + AsyncWrite> Sink for Connection<T> { | ||||
|     type SinkItem = frame::Frame; | ||||
|     type SinkError = ConnectionError; | ||||
|  | ||||
|     fn start_send(&mut self, item: frame::Frame) -> StartSend<frame::Frame, ConnectionError> { | ||||
|         self.inner.start_send(item) | ||||
|     } | ||||
|  | ||||
|     fn poll_complete(&mut self) -> Poll<(), ConnectionError> { | ||||
|         self.inner.poll_complete() | ||||
|     } | ||||
| } | ||||
| @@ -1,65 +1,11 @@ | ||||
| mod connection; | ||||
| mod framed_read; | ||||
| mod framed_write; | ||||
| mod ping_pong; | ||||
| mod settings; | ||||
|  | ||||
| use {frame, ConnectionError}; | ||||
| use self::framed_read::FramedRead; | ||||
| use self::framed_write::FramedWrite; | ||||
|  | ||||
| use tokio_io::{AsyncRead, AsyncWrite}; | ||||
| use tokio_io::codec::length_delimited; | ||||
|  | ||||
| use futures::*; | ||||
|  | ||||
| pub struct Connection<T> { | ||||
|     inner: Inner<T>, | ||||
| } | ||||
|  | ||||
| type Inner<T> = | ||||
|     FramedWrite< | ||||
|         FramedRead< | ||||
|             length_delimited::FramedRead<T>>>; | ||||
|  | ||||
| impl<T: AsyncRead + AsyncWrite> Connection<T> { | ||||
|     pub fn new(io: T) -> Connection<T> { | ||||
|         // Delimit the frames | ||||
|         let framed_read = length_delimited::Builder::new() | ||||
|             .big_endian() | ||||
|             .length_field_length(3) | ||||
|             .length_adjustment(6) | ||||
|             .num_skip(0) // Don't skip the header | ||||
|             .new_read(io); | ||||
|  | ||||
|         // Map to `Frame` types | ||||
|         let framed_read = FramedRead::new(framed_read); | ||||
|  | ||||
|         // Frame encoder | ||||
|         let framed = FramedWrite::new(framed_read); | ||||
|  | ||||
|         Connection { | ||||
|             inner: framed, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: AsyncRead + AsyncWrite> Stream for Connection<T> { | ||||
|     type Item = frame::Frame; | ||||
|     type Error = ConnectionError; | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Option<frame::Frame>, ConnectionError> { | ||||
|         self.inner.poll() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: AsyncRead + AsyncWrite> Sink for Connection<T> { | ||||
|     type SinkItem = frame::Frame; | ||||
|     type SinkError = ConnectionError; | ||||
|  | ||||
|     fn start_send(&mut self, item: frame::Frame) -> StartSend<frame::Frame, ConnectionError> { | ||||
|         self.inner.start_send(item) | ||||
|     } | ||||
|  | ||||
|     fn poll_complete(&mut self) -> Poll<(), ConnectionError> { | ||||
|         self.inner.poll_complete() | ||||
|     } | ||||
| } | ||||
| pub use self::connection::Connection; | ||||
| pub use self::framed_read::FramedRead; | ||||
| pub use self::framed_write::FramedWrite; | ||||
| pub use self::ping_pong::PingPong; | ||||
| pub use self::settings::Settings; | ||||
|   | ||||
							
								
								
									
										47
									
								
								src/proto/ping_pong.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								src/proto/ping_pong.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,47 @@ | ||||
| use ConnectionError; | ||||
| use frame::Frame; | ||||
|  | ||||
| use futures::*; | ||||
|  | ||||
| pub struct PingPong<T> { | ||||
|     inner: T, | ||||
| } | ||||
|  | ||||
| impl<T> PingPong<T> | ||||
|     where T: Stream<Item = Frame, Error = ConnectionError>, | ||||
|           T: Sink<SinkItem = Frame, SinkError = ConnectionError>, | ||||
| { | ||||
|     pub fn new(inner: T) -> PingPong<T> { | ||||
|         PingPong { | ||||
|             inner: inner, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T> Stream for PingPong<T> | ||||
|     where T: Stream<Item = Frame, Error = ConnectionError>, | ||||
|           T: Sink<SinkItem = Frame, SinkError = ConnectionError>, | ||||
| { | ||||
|     type Item = Frame; | ||||
|     type Error = ConnectionError; | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> { | ||||
|         self.inner.poll() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T> Sink for PingPong<T> | ||||
|     where T: Stream<Item = Frame, Error = ConnectionError>, | ||||
|           T: Sink<SinkItem = Frame, SinkError = ConnectionError>, | ||||
| { | ||||
|     type SinkItem = Frame; | ||||
|     type SinkError = ConnectionError; | ||||
|  | ||||
|     fn start_send(&mut self, item: Frame) -> StartSend<Frame, ConnectionError> { | ||||
|         self.inner.start_send(item) | ||||
|     } | ||||
|  | ||||
|     fn poll_complete(&mut self) -> Poll<(), ConnectionError> { | ||||
|         self.inner.poll_complete() | ||||
|     } | ||||
| } | ||||
| @@ -15,19 +15,22 @@ pub struct Settings<T> { | ||||
|     is_dirty: bool, | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * TODO: | ||||
|  * - Settings ack timeout for connection error | ||||
|  */ | ||||
|  | ||||
| impl<T> Settings<T> | ||||
|     where T: Stream<Item = Frame, Error = ConnectionError>, | ||||
|           T: Sink<SinkItem = Frame, SinkError = ConnectionError>, | ||||
| { | ||||
|     pub fn new(inner: T) -> Settings<T> { | ||||
|     pub fn new(inner: T, local: frame::SettingSet) -> Settings<T> { | ||||
|         Settings { | ||||
|             inner: inner, | ||||
|             local: frame::SettingSet::default(), | ||||
|             local: local, | ||||
|             remote: frame::SettingSet::default(), | ||||
|             remaining_acks: 0, | ||||
|             // Always start in the dirty state as sending the settings frame is | ||||
|             // part of the connection handshake | ||||
|             is_dirty: true, | ||||
|             is_dirty: false, | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -37,14 +40,20 @@ impl<T> Settings<T> | ||||
|  | ||||
|     fn try_send_pending(&mut self) -> Poll<(), ConnectionError> { | ||||
|         if self.is_dirty { | ||||
|             // Create the new frame | ||||
|             let frame = frame::Settings::new(self.local.clone()).into(); | ||||
|             try_ready!(self.try_send(frame)); | ||||
|  | ||||
|             self.is_dirty = false; | ||||
|         } | ||||
|  | ||||
|         unimplemented!(); | ||||
|         while self.remaining_acks > 0 { | ||||
|             let frame = frame::Settings::ack().into(); | ||||
|             try_ready!(self.try_send(frame)); | ||||
|  | ||||
|             self.remaining_acks -= 1; | ||||
|         } | ||||
|  | ||||
|         Ok(Async::Ready(())) | ||||
|     } | ||||
|  | ||||
|     fn try_send(&mut self, item: frame::Frame) -> Poll<(), ConnectionError> { | ||||
| @@ -82,13 +91,8 @@ impl<T> Sink for Settings<T> | ||||
|         // Settings frames take priority, so `item` cannot be sent if there are | ||||
|         // any pending acks OR the local settings have been changed w/o sending | ||||
|         // an associated frame. | ||||
|         if self.has_pending_sends() { | ||||
|             // Try to flush them | ||||
|             try!(self.poll_complete()); | ||||
|  | ||||
|             if self.has_pending_sends() { | ||||
|                 return Ok(AsyncSink::NotReady(item)); | ||||
|             } | ||||
|         if !try!(self.try_send_pending()).is_ready() { | ||||
|             return Ok(AsyncSink::NotReady(item)); | ||||
|         } | ||||
|  | ||||
|         self.inner.start_send(item) | ||||
| @@ -99,8 +103,8 @@ impl<T> Sink for Settings<T> | ||||
|     } | ||||
|  | ||||
|     fn close(&mut self) -> Poll<(), ConnectionError> { | ||||
|         if self.has_pending_sends() { | ||||
|             try_ready!(self.poll_complete()); | ||||
|         if !try!(self.try_send_pending()).is_ready() { | ||||
|             return Ok(Async::NotReady); | ||||
|         } | ||||
|  | ||||
|         self.inner.close() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user