194 lines
4.9 KiB
Rust
194 lines
4.9 KiB
Rust
mod error;
|
|
mod framed_read;
|
|
mod framed_write;
|
|
|
|
pub use self::error::{RecvError, SendError, UserError};
|
|
|
|
use self::framed_read::FramedRead;
|
|
use self::framed_write::FramedWrite;
|
|
|
|
use frame::{self, Data, Frame};
|
|
|
|
use futures::*;
|
|
|
|
use tokio_io::{AsyncRead, AsyncWrite};
|
|
use tokio_io::codec::length_delimited;
|
|
|
|
use bytes::Buf;
|
|
|
|
use std::io;
|
|
|
|
#[derive(Debug)]
|
|
pub struct Codec<T, B> {
|
|
inner: FramedRead<FramedWrite<T, B>>,
|
|
}
|
|
|
|
impl<T, B> Codec<T, B>
|
|
where
|
|
T: AsyncRead + AsyncWrite,
|
|
B: Buf,
|
|
{
|
|
/// Returns a new `Codec` with the default max frame size
|
|
#[inline]
|
|
pub fn new(io: T) -> Self {
|
|
Self::with_max_recv_frame_size(io, frame::DEFAULT_MAX_FRAME_SIZE as usize)
|
|
}
|
|
|
|
/// Returns a new `Codec` with the given maximum frame size
|
|
pub fn with_max_recv_frame_size(io: T, max_frame_size: usize) -> Self {
|
|
// Wrap with writer
|
|
let framed_write = FramedWrite::new(io);
|
|
|
|
// Delimit the frames
|
|
let delimited = length_delimited::Builder::new()
|
|
.big_endian()
|
|
.length_field_length(3)
|
|
.length_adjustment(9)
|
|
.num_skip(0) // Don't skip the header
|
|
.new_read(framed_write);
|
|
|
|
let mut inner = FramedRead::new(delimited);
|
|
|
|
// Use FramedRead's method since it checks the value is within range.
|
|
inner.set_max_frame_size(max_frame_size);
|
|
|
|
Codec {
|
|
inner,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T, B> Codec<T, B> {
|
|
/// Updates the max received frame size.
|
|
///
|
|
/// The change takes effect the next time a frame is decoded. In other
|
|
/// words, if a frame is currently in process of being decoded with a frame
|
|
/// size greater than `val` but less than the max frame size in effect
|
|
/// before calling this function, then the frame will be allowed.
|
|
#[inline]
|
|
pub fn set_max_recv_frame_size(&mut self, val: usize) {
|
|
self.inner.set_max_frame_size(val)
|
|
}
|
|
|
|
/// Returns the current max received frame size setting.
|
|
///
|
|
/// This is the largest size this codec will accept from the wire. Larger
|
|
/// frames will be rejected.
|
|
#[cfg(feature = "unstable")]
|
|
#[inline]
|
|
pub fn max_recv_frame_size(&self) -> usize {
|
|
self.inner.max_frame_size()
|
|
}
|
|
|
|
/// Returns the max frame size that can be sent to the peer.
|
|
pub fn max_send_frame_size(&self) -> usize {
|
|
self.inner.get_ref().max_frame_size()
|
|
}
|
|
|
|
/// Set the peer's max frame size.
|
|
pub fn set_max_send_frame_size(&mut self, val: usize) {
|
|
self.framed_write().set_max_frame_size(val)
|
|
}
|
|
|
|
/// Get a reference to the inner stream.
|
|
#[cfg(feature = "unstable")]
|
|
pub fn get_ref(&self) -> &T {
|
|
self.inner.get_ref().get_ref()
|
|
}
|
|
|
|
/// Get a mutable reference to the inner stream.
|
|
pub fn get_mut(&mut self) -> &mut T {
|
|
self.inner.get_mut().get_mut()
|
|
}
|
|
|
|
/// Takes the data payload value that was fully written to the socket
|
|
pub(crate) fn take_last_data_frame(&mut self) -> Option<Data<B>> {
|
|
self.framed_write().take_last_data_frame()
|
|
}
|
|
|
|
fn framed_write(&mut self) -> &mut FramedWrite<T, B> {
|
|
self.inner.get_mut()
|
|
}
|
|
}
|
|
|
|
impl<T, B> Codec<T, B>
|
|
where
|
|
T: AsyncWrite,
|
|
B: Buf,
|
|
{
|
|
/// Returns `Ready` when the codec can buffer a frame
|
|
pub fn poll_ready(&mut self) -> Poll<(), io::Error> {
|
|
self.framed_write().poll_ready()
|
|
}
|
|
|
|
/// Buffer a frame.
|
|
///
|
|
/// `poll_ready` must be called first to ensure that a frame may be
|
|
/// accepted.
|
|
///
|
|
/// TODO: Rename this to avoid conflicts with Sink::buffer
|
|
pub fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
|
|
self.framed_write().buffer(item)
|
|
}
|
|
|
|
/// Flush buffered data to the wire
|
|
pub fn flush(&mut self) -> Poll<(), io::Error> {
|
|
self.framed_write().flush()
|
|
}
|
|
|
|
/// Shutdown the send half
|
|
pub fn shutdown(&mut self) -> Poll<(), io::Error> {
|
|
self.framed_write().shutdown()
|
|
}
|
|
}
|
|
|
|
impl<T, B> Stream for Codec<T, B>
|
|
where
|
|
T: AsyncRead,
|
|
{
|
|
type Item = Frame;
|
|
type Error = RecvError;
|
|
|
|
fn poll(&mut self) -> Poll<Option<Frame>, Self::Error> {
|
|
self.inner.poll()
|
|
}
|
|
}
|
|
|
|
impl<T, B> Sink for Codec<T, B>
|
|
where
|
|
T: AsyncWrite,
|
|
B: Buf,
|
|
{
|
|
type SinkItem = Frame<B>;
|
|
type SinkError = SendError;
|
|
|
|
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
|
|
if !self.poll_ready()?.is_ready() {
|
|
return Ok(AsyncSink::NotReady(item));
|
|
}
|
|
|
|
self.buffer(item)?;
|
|
Ok(AsyncSink::Ready)
|
|
}
|
|
|
|
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
|
|
self.flush()?;
|
|
Ok(Async::Ready(()))
|
|
}
|
|
|
|
fn close(&mut self) -> Poll<(), Self::SinkError> {
|
|
self.shutdown()?;
|
|
Ok(Async::Ready(()))
|
|
}
|
|
}
|
|
|
|
// TODO: remove (or improve) this
|
|
impl<T> From<T> for Codec<T, ::std::io::Cursor<::bytes::Bytes>>
|
|
where
|
|
T: AsyncRead + AsyncWrite,
|
|
{
|
|
fn from(src: T) -> Self {
|
|
Self::new(src)
|
|
}
|
|
}
|