More work
This commit is contained in:
		| @@ -1,15 +1,26 @@ | ||||
| use ConnectionError; | ||||
| use frame::Frame; | ||||
|  | ||||
| use futures::*; | ||||
| use bytes::BytesMut; | ||||
| use tokio_io::AsyncWrite; | ||||
|  | ||||
| use std::io; | ||||
| use futures::*; | ||||
| use bytes::{BytesMut, Buf}; | ||||
|  | ||||
| use std::io::{self, Write}; | ||||
|  | ||||
| pub struct FramedRead<T> { | ||||
|     inner: T, | ||||
| } | ||||
|  | ||||
| impl<T> FramedRead<T> | ||||
|     where T: Stream<Item = BytesMut, Error = io::Error>, | ||||
|           T: AsyncWrite, | ||||
| { | ||||
|     pub fn new(inner: T) -> FramedRead<T> { | ||||
|         FramedRead { inner: inner } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T> Stream for FramedRead<T> | ||||
|     where T: Stream<Item = BytesMut, Error = io::Error>, | ||||
| { | ||||
| @@ -41,3 +52,22 @@ impl<T: Sink> Sink for FramedRead<T> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: io::Write> io::Write for FramedRead<T> { | ||||
|     fn write(&mut self, src: &[u8]) -> io::Result<usize> { | ||||
|         self.inner.write(src) | ||||
|     } | ||||
|  | ||||
|     fn flush(&mut self) -> io::Result<()> { | ||||
|         self.inner.flush() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: AsyncWrite> AsyncWrite for FramedRead<T> { | ||||
|     fn shutdown(&mut self) -> Poll<(), io::Error> { | ||||
|         self.inner.shutdown() | ||||
|     } | ||||
|  | ||||
|     fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { | ||||
|         self.inner.write_buf(buf) | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -87,3 +87,12 @@ impl<T: AsyncWrite> Sink for FramedWrite<T> { | ||||
|         self.inner.shutdown().map_err(Into::into) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: Stream> Stream for FramedWrite<T> { | ||||
|     type Item = T::Item; | ||||
|     type Error = T::Error; | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> { | ||||
|         self.inner.poll() | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -1,4 +1,64 @@ | ||||
| mod framed_read; | ||||
| mod framed_write; | ||||
|  | ||||
| pub use self::framed_read::FramedRead; | ||||
| use {frame, ConnectionError}; | ||||
| use self::framed_read::FramedRead; | ||||
| use self::framed_write::FramedWrite; | ||||
|  | ||||
| use tokio_io::{AsyncRead, AsyncWrite}; | ||||
| use tokio_io::codec::length_delimited; | ||||
|  | ||||
| use futures::*; | ||||
|  | ||||
| pub struct Connection<T> { | ||||
|     inner: Inner<T>, | ||||
| } | ||||
|  | ||||
| type Inner<T> = | ||||
|     FramedWrite< | ||||
|         FramedRead< | ||||
|             length_delimited::FramedRead<T>>>; | ||||
|  | ||||
| impl<T: AsyncRead + AsyncWrite> Connection<T> { | ||||
|     pub fn new(io: T) -> Connection<T> { | ||||
|         // Delimit the frames | ||||
|         let framed_read = length_delimited::Builder::new() | ||||
|             .big_endian() | ||||
|             .length_field_length(3) | ||||
|             .length_adjustment(6) | ||||
|             .num_skip(0) // Don't skip the header | ||||
|             .new_read(io); | ||||
|  | ||||
|         // Map to `Frame` types | ||||
|         let framed_read = FramedRead::new(framed_read); | ||||
|  | ||||
|         // Frame encoder | ||||
|         let framed = FramedWrite::new(framed_read); | ||||
|  | ||||
|         Connection { | ||||
|             inner: framed, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: AsyncRead + AsyncWrite> Stream for Connection<T> { | ||||
|     type Item = frame::Frame; | ||||
|     type Error = ConnectionError; | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Option<frame::Frame>, ConnectionError> { | ||||
|         self.inner.poll() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: AsyncRead + AsyncWrite> Sink for Connection<T> { | ||||
|     type SinkItem = frame::Frame; | ||||
|     type SinkError = ConnectionError; | ||||
|  | ||||
|     fn start_send(&mut self, item: frame::Frame) -> StartSend<frame::Frame, ConnectionError> { | ||||
|         self.inner.start_send(item) | ||||
|     } | ||||
|  | ||||
|     fn poll_complete(&mut self) -> Poll<(), ConnectionError> { | ||||
|         self.inner.poll_complete() | ||||
|     } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user