diff --git a/src/lib.rs b/src/lib.rs index 96a0ce1..4994445 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,49 +11,4 @@ pub mod proto; pub mod frame; pub use error::{ConnectionError, StreamError, Reason}; - -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::codec::length_delimited; - -use futures::*; - -pub struct Transport { - inner: length_delimited::FramedRead, -} - -impl Transport { - pub fn bind(io: T) -> Transport { - let framed = length_delimited::Builder::new() - .big_endian() - .length_field_length(3) - .length_adjustment(6) - .num_skip(0) // Don't skip the header - .new_read(io); - - Transport { - inner: framed, - } - } -} - -impl Stream for Transport { - type Item = frame::Frame; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, ConnectionError> { - unimplemented!(); - } -} - -impl Sink for Transport { - type SinkItem = frame::Frame; - type SinkError = ConnectionError; - - fn start_send(&mut self, item: frame::Frame) -> StartSend { - unimplemented!(); - } - - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - unimplemented!(); - } -} +pub use proto::Connection; diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index cf67b89..5abe602 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -1,15 +1,26 @@ use ConnectionError; use frame::Frame; -use futures::*; -use bytes::BytesMut; +use tokio_io::AsyncWrite; -use std::io; +use futures::*; +use bytes::{BytesMut, Buf}; + +use std::io::{self, Write}; pub struct FramedRead { inner: T, } +impl FramedRead + where T: Stream, + T: AsyncWrite, +{ + pub fn new(inner: T) -> FramedRead { + FramedRead { inner: inner } + } +} + impl Stream for FramedRead where T: Stream, { @@ -41,3 +52,22 @@ impl Sink for FramedRead { } } +impl io::Write for FramedRead { + fn write(&mut self, src: &[u8]) -> io::Result { + self.inner.write(src) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} + +impl AsyncWrite for FramedRead { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.inner.shutdown() + } + + fn write_buf(&mut self, buf: &mut B) -> Poll { + self.inner.write_buf(buf) + } +} diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index 4b95f2a..83d60cb 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -87,3 +87,12 @@ impl Sink for FramedWrite { self.inner.shutdown().map_err(Into::into) } } + +impl Stream for FramedWrite { + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll, T::Error> { + self.inner.poll() + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index daa3d73..67922eb 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,4 +1,64 @@ mod framed_read; mod framed_write; -pub use self::framed_read::FramedRead; +use {frame, ConnectionError}; +use self::framed_read::FramedRead; +use self::framed_write::FramedWrite; + +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::length_delimited; + +use futures::*; + +pub struct Connection { + inner: Inner, +} + +type Inner = + FramedWrite< + FramedRead< + length_delimited::FramedRead>>; + +impl Connection { + pub fn new(io: T) -> Connection { + // Delimit the frames + let framed_read = length_delimited::Builder::new() + .big_endian() + .length_field_length(3) + .length_adjustment(6) + .num_skip(0) // Don't skip the header + .new_read(io); + + // Map to `Frame` types + let framed_read = FramedRead::new(framed_read); + + // Frame encoder + let framed = FramedWrite::new(framed_read); + + Connection { + inner: framed, + } + } +} + +impl Stream for Connection { + type Item = frame::Frame; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + self.inner.poll() + } +} + +impl Sink for Connection { + type SinkItem = frame::Frame; + type SinkError = ConnectionError; + + fn start_send(&mut self, item: frame::Frame) -> StartSend { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_complete() + } +}