More hackin
This commit is contained in:
@@ -4,7 +4,7 @@ use frame::{self, Frame, Kind};
|
||||
use tokio_io::AsyncWrite;
|
||||
|
||||
use futures::*;
|
||||
use bytes::{BytesMut, Buf};
|
||||
use bytes::{Bytes, BytesMut, Buf};
|
||||
|
||||
use std::io::{self, Write};
|
||||
|
||||
@@ -27,18 +27,8 @@ impl<T> FramedRead<T>
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for FramedRead<T>
|
||||
where T: Stream<Item = BytesMut, Error = io::Error>,
|
||||
{
|
||||
type Item = Frame;
|
||||
type Error = ConnectionError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> {
|
||||
let mut bytes = match try_ready!(self.inner.poll()) {
|
||||
Some(bytes) => bytes,
|
||||
None => return Ok(Async::Ready(None)),
|
||||
};
|
||||
|
||||
impl<T> FramedRead<T> {
|
||||
fn decode_frame(&mut self, mut bytes: Bytes) -> Result<Option<Frame>, ConnectionError> {
|
||||
// Parse the head
|
||||
let head = frame::Head::parse(&bytes);
|
||||
|
||||
@@ -58,12 +48,31 @@ impl<T> Stream for FramedRead<T>
|
||||
Kind::Continuation => unimplemented!(),
|
||||
Kind::Unknown => {
|
||||
let _ = bytes.split_to(frame::HEADER_LEN);
|
||||
frame::Unknown::new(head, bytes.freeze()).into()
|
||||
frame::Unknown::new(head, bytes).into()
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
};
|
||||
|
||||
Ok(Async::Ready(Some(frame)))
|
||||
Ok(Some(frame))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for FramedRead<T>
|
||||
where T: Stream<Item = BytesMut, Error = io::Error>,
|
||||
{
|
||||
type Item = Frame;
|
||||
type Error = ConnectionError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> {
|
||||
loop {
|
||||
let mut bytes = match try_ready!(self.inner.poll()) {
|
||||
Some(bytes) => bytes.freeze(),
|
||||
None => return Ok(Async::Ready(None)),
|
||||
};
|
||||
|
||||
if let Some(frame) = try!(self.decode_frame(bytes)) {
|
||||
return Ok(Async::Ready(Some(frame)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user