Update lib to std-future

This commit is contained in:
Gurwinder Singh
2019-08-15 08:25:14 +05:30
committed by Sean McArthur
parent 782f1f712c
commit c8fefd49f1
19 changed files with 1125 additions and 1038 deletions

View File

@@ -1,24 +1,29 @@
use crate::codec::RecvError;
use crate::frame::{self, Frame, Kind, Reason};
use crate::frame::{DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE};
use crate::frame::{
DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE,
};
use crate::hpack;
use futures::*;
use futures::{ready, Stream};
use bytes::BytesMut;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_codec::{LengthDelimitedCodec, LengthDelimitedCodecError};
use tokio_codec::FramedRead as InnerFramedRead;
use tokio_io::AsyncRead;
use tokio_io::codec::length_delimited;
// 16 MB "sane default" taken from golang http2
const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: usize = 16 << 20;
#[derive(Debug)]
pub struct FramedRead<T> {
inner: length_delimited::FramedRead<T>,
inner: InnerFramedRead<T, LengthDelimitedCodec>,
// hpack decoder state
hpack: hpack::Decoder,
@@ -45,7 +50,7 @@ enum Continuable {
}
impl<T> FramedRead<T> {
pub fn new(inner: length_delimited::FramedRead<T>) -> FramedRead<T> {
pub fn new(inner: InnerFramedRead<T, LengthDelimitedCodec>) -> FramedRead<T> {
FramedRead {
inner: inner,
hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
@@ -138,24 +143,27 @@ impl<T> FramedRead<T> {
res.map_err(|e| {
proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR)
})?.into()
},
})?
.into()
}
Kind::Ping => {
let res = frame::Ping::load(head, &bytes[frame::HEADER_LEN..]);
res.map_err(|e| {
proto_err!(conn: "failed to load PING frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR)
})?.into()
},
})?
.into()
}
Kind::WindowUpdate => {
let res = frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..]);
res.map_err(|e| {
proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR)
})?.into()
},
})?
.into()
}
Kind::Data => {
let _ = bytes.split_to(frame::HEADER_LEN);
let res = frame::Data::load(head, bytes.freeze());
@@ -164,28 +172,27 @@ impl<T> FramedRead<T> {
res.map_err(|e| {
proto_err!(conn: "failed to load DATA frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR)
})?.into()
},
Kind::Headers => {
header_block!(Headers, head, bytes)
},
})?
.into()
}
Kind::Headers => header_block!(Headers, head, bytes),
Kind::Reset => {
let res = frame::Reset::load(head, &bytes[frame::HEADER_LEN..]);
res.map_err(|e| {
proto_err!(conn: "failed to load RESET frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR)
})?.into()
},
})?
.into()
}
Kind::GoAway => {
let res = frame::GoAway::load(&bytes[frame::HEADER_LEN..]);
res.map_err(|e| {
proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR)
})?.into()
},
Kind::PushPromise => {
header_block!(PushPromise, head, bytes)
},
})?
.into()
}
Kind::PushPromise => header_block!(PushPromise, head, bytes),
Kind::Priority => {
if head.stream_id() == 0 {
// Invalid stream identifier
@@ -205,13 +212,13 @@ impl<T> FramedRead<T> {
id,
reason: Reason::PROTOCOL_ERROR,
});
},
}
Err(e) => {
proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e);
return Err(Connection(Reason::PROTOCOL_ERROR));
}
}
},
}
Kind::Continuation => {
let is_end_headers = (head.flag() & 0x4) == 0x4;
@@ -229,8 +236,6 @@ impl<T> FramedRead<T> {
return Err(Connection(Reason::PROTOCOL_ERROR));
}
// Extend the buf
if partial.buf.is_empty() {
partial.buf = bytes.split_off(frame::HEADER_LEN);
@@ -257,9 +262,14 @@ impl<T> FramedRead<T> {
partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]);
}
match partial.frame.load_hpack(&mut partial.buf, self.max_header_list_size, &mut self.hpack) {
Ok(_) => {},
Err(frame::Error::Hpack(hpack::DecoderError::NeedMore(_))) if !is_end_headers => {},
match partial.frame.load_hpack(
&mut partial.buf,
self.max_header_list_size,
&mut self.hpack,
) {
Ok(_) => {}
Err(frame::Error::Hpack(hpack::DecoderError::NeedMore(_)))
if !is_end_headers => {}
Err(frame::Error::MalformedMessage) => {
let id = head.stream_id();
proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id);
@@ -267,11 +277,11 @@ impl<T> FramedRead<T> {
id,
reason: Reason::PROTOCOL_ERROR,
});
},
}
Err(e) => {
proto_err!(conn: "failed HPACK decoding; err={:?}", e);
return Err(Connection(Reason::PROTOCOL_ERROR));
},
}
}
if is_end_headers {
@@ -280,11 +290,11 @@ impl<T> FramedRead<T> {
self.partial = Some(partial);
return Ok(None);
}
},
}
Kind::Unknown => {
// Unknown frames are ignored
return Ok(None);
},
}
};
Ok(Some(frame))
@@ -302,7 +312,7 @@ impl<T> FramedRead<T> {
#[cfg(feature = "unstable")]
#[inline]
pub fn max_frame_size(&self) -> usize {
self.inner.max_frame_length()
self.inner.decoder().max_frame_length()
}
/// Updates the max frame size setting.
@@ -311,7 +321,7 @@ impl<T> FramedRead<T> {
#[inline]
pub fn set_max_frame_size(&mut self, val: usize) {
assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize);
self.inner.set_max_frame_length(val)
self.inner.decoder_mut().set_max_frame_length(val)
}
/// Update the max header list size setting.
@@ -323,34 +333,32 @@ impl<T> FramedRead<T> {
impl<T> Stream for FramedRead<T>
where
T: AsyncRead,
T: AsyncRead + Unpin,
{
type Item = Frame;
type Error = RecvError;
type Item = Result<Frame, RecvError>;
fn poll(&mut self) -> Poll<Option<Frame>, Self::Error> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
log::trace!("poll");
let bytes = match try_ready!(self.inner.poll().map_err(map_err)) {
Some(bytes) => bytes,
None => return Ok(Async::Ready(None)),
let bytes = match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
Some(Ok(bytes)) => bytes,
Some(Err(e)) => return Poll::Ready(Some(Err(map_err(e)))),
None => return Poll::Ready(None),
};
log::trace!("poll; bytes={}B", bytes.len());
if let Some(frame) = self.decode_frame(bytes)? {
log::debug!("received; frame={:?}", frame);
return Ok(Async::Ready(Some(frame)));
return Poll::Ready(Some(Ok(frame)));
}
}
}
}
fn map_err(err: io::Error) -> RecvError {
use tokio_io::codec::length_delimited::FrameTooBig;
if let io::ErrorKind::InvalidData = err.kind() {
if let Some(custom) = err.get_ref() {
if custom.is::<FrameTooBig>() {
if custom.is::<LengthDelimitedCodecError>() {
return RecvError::Connection(Reason::FRAME_SIZE_ERROR);
}
}

View File

@@ -4,8 +4,10 @@ use crate::frame::{self, Frame, FrameSize};
use crate::hpack;
use bytes::{Buf, BufMut, BytesMut};
use futures::*;
use tokio_io::{AsyncRead, AsyncWrite, try_nb};
use futures::ready;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use std::io::{self, Cursor};
@@ -55,12 +57,12 @@ const CHAIN_THRESHOLD: usize = 256;
// TODO: Make generic
impl<T, B> FramedWrite<T, B>
where
T: AsyncWrite,
T: AsyncWrite + Unpin,
B: Buf,
{
pub fn new(inner: T) -> FramedWrite<T, B> {
FramedWrite {
inner: inner,
inner,
hpack: hpack::Encoder::default(),
buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)),
next: None,
@@ -73,17 +75,17 @@ where
///
/// Calling this function may result in the current contents of the buffer
/// to be flushed to `T`.
pub fn poll_ready(&mut self) -> Poll<(), io::Error> {
pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
if !self.has_capacity() {
// Try flushing
self.flush()?;
ready!(self.flush(cx))?;
if !self.has_capacity() {
return Ok(Async::NotReady);
return Poll::Pending;
}
}
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
}
/// Buffer a frame.
@@ -123,33 +125,33 @@ where
// Save off the last frame...
self.last_data_frame = Some(v);
}
},
}
Frame::Headers(v) => {
if let Some(continuation) = v.encode(&mut self.hpack, self.buf.get_mut()) {
self.next = Some(Next::Continuation(continuation));
}
},
}
Frame::PushPromise(v) => {
if let Some(continuation) = v.encode(&mut self.hpack, self.buf.get_mut()) {
self.next = Some(Next::Continuation(continuation));
}
},
}
Frame::Settings(v) => {
v.encode(self.buf.get_mut());
log::trace!("encoded settings; rem={:?}", self.buf.remaining());
},
}
Frame::GoAway(v) => {
v.encode(self.buf.get_mut());
log::trace!("encoded go_away; rem={:?}", self.buf.remaining());
},
}
Frame::Ping(v) => {
v.encode(self.buf.get_mut());
log::trace!("encoded ping; rem={:?}", self.buf.remaining());
},
}
Frame::WindowUpdate(v) => {
v.encode(self.buf.get_mut());
log::trace!("encoded window_update; rem={:?}", self.buf.remaining());
},
}
Frame::Priority(_) => {
/*
@@ -157,18 +159,18 @@ where
log::trace!("encoded priority; rem={:?}", self.buf.remaining());
*/
unimplemented!();
},
}
Frame::Reset(v) => {
v.encode(self.buf.get_mut());
log::trace!("encoded reset; rem={:?}", self.buf.remaining());
},
}
}
Ok(())
}
/// Flush buffered data to the wire
pub fn flush(&mut self) -> Poll<(), io::Error> {
pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
log::trace!("flush");
loop {
@@ -177,12 +179,12 @@ where
Some(Next::Data(ref mut frame)) => {
log::trace!(" -> queued data frame");
let mut buf = Buf::by_ref(&mut self.buf).chain(frame.payload_mut());
try_ready!(self.inner.write_buf(&mut buf));
},
ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut buf))?;
}
_ => {
log::trace!(" -> not a queued data frame");
try_ready!(self.inner.write_buf(&mut self.buf));
},
ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut self.buf))?;
}
}
}
@@ -196,11 +198,10 @@ where
self.last_data_frame = Some(frame);
debug_assert!(self.is_empty());
break;
},
}
Some(Next::Continuation(frame)) => {
// Buffer the continuation frame, then try to write again
if let Some(continuation) = frame.encode(&mut self.hpack, self.buf.get_mut()) {
// We previously had a CONTINUATION, and after encoding
// it, we got *another* one? Let's just double check
// that at least some progress is being made...
@@ -213,7 +214,7 @@ where
self.next = Some(Next::Continuation(continuation));
}
},
}
None => {
break;
}
@@ -222,15 +223,15 @@ where
log::trace!("flushing buffer");
// Flush the upstream
try_nb!(self.inner.flush());
ready!(Pin::new(&mut self.inner).poll_flush(cx))?;
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
}
/// Close the codec
pub fn shutdown(&mut self) -> Poll<(), io::Error> {
try_ready!(self.flush());
self.inner.shutdown().map_err(Into::into)
pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
ready!(self.flush(cx))?;
Pin::new(&mut self.inner).poll_shutdown(cx)
}
fn has_capacity(&self) -> bool {
@@ -267,23 +268,18 @@ impl<T, B> FramedWrite<T, B> {
}
}
impl<T: io::Read, B> io::Read for FramedWrite<T, B> {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
self.inner.read(dst)
}
}
impl<T: AsyncRead, B> AsyncRead for FramedWrite<T, B> {
fn read_buf<B2: BufMut>(&mut self, buf: &mut B2) -> Poll<usize, io::Error>
where
Self: Sized,
{
self.inner.read_buf(buf)
}
impl<T: AsyncRead + Unpin, B: Unpin> AsyncRead for FramedWrite<T, B> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}
#[cfg(feature = "unstable")]

View File

@@ -14,10 +14,11 @@ use crate::frame::{self, Data, Frame};
use futures::*;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
use bytes::Buf;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_codec::length_delimited;
use tokio_io::{AsyncRead, AsyncWrite};
use std::io;
@@ -28,8 +29,8 @@ pub struct Codec<T, B> {
impl<T, B> Codec<T, B>
where
T: AsyncRead + AsyncWrite,
B: Buf,
T: AsyncRead + AsyncWrite + Unpin,
B: Buf + Unpin,
{
/// Returns a new `Codec` with the default max frame size
#[inline]
@@ -55,9 +56,7 @@ where
// Use FramedRead's method since it checks the value is within range.
inner.set_max_frame_size(max_frame_size);
Codec {
inner,
}
Codec { inner }
}
}
@@ -121,12 +120,12 @@ impl<T, B> Codec<T, B> {
impl<T, B> Codec<T, B>
where
T: AsyncWrite,
B: Buf,
T: AsyncWrite + Unpin,
B: Buf + Unpin,
{
/// Returns `Ready` when the codec can buffer a frame
pub fn poll_ready(&mut self) -> Poll<(), io::Error> {
self.framed_write().poll_ready()
pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
self.framed_write().poll_ready(cx)
}
/// Buffer a frame.
@@ -140,60 +139,59 @@ where
}
/// Flush buffered data to the wire
pub fn flush(&mut self) -> Poll<(), io::Error> {
self.framed_write().flush()
pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
self.framed_write().flush(cx)
}
/// Shutdown the send half
pub fn shutdown(&mut self) -> Poll<(), io::Error> {
self.framed_write().shutdown()
pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
self.framed_write().shutdown(cx)
}
}
impl<T, B> Stream for Codec<T, B>
where
T: AsyncRead,
T: AsyncRead + Unpin,
B: Unpin,
{
type Item = Frame;
type Error = RecvError;
type Item = Result<Frame, RecvError>;
fn poll(&mut self) -> Poll<Option<Frame>, Self::Error> {
self.inner.poll()
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
impl<T, B> Sink for Codec<T, B>
impl<T, B> Sink<Frame<B>> for Codec<T, B>
where
T: AsyncWrite,
B: Buf,
T: AsyncWrite + Unpin,
B: Buf + Unpin,
{
type SinkItem = Frame<B>;
type SinkError = SendError;
type Error = SendError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
if !self.poll_ready()?.is_ready() {
return Ok(AsyncSink::NotReady(item));
}
self.buffer(item)?;
Ok(AsyncSink::Ready)
fn start_send(mut self: Pin<&mut Self>, item: Frame<B>) -> Result<(), Self::Error> {
Codec::buffer(&mut self, item)?;
Ok(())
}
/// Returns `Ready` when the codec can buffer a frame
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.framed_write().poll_ready(cx).map_err(Into::into)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.flush()?;
Ok(Async::Ready(()))
/// Flush buffered data to the wire
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.framed_write().flush(cx).map_err(Into::into)
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
self.shutdown()?;
Ok(Async::Ready(()))
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.shutdown(cx))?;
Poll::Ready(Ok(()))
}
}
// TODO: remove (or improve) this
impl<T> From<T> for Codec<T, ::std::io::Cursor<::bytes::Bytes>>
where
T: AsyncRead + AsyncWrite,
T: AsyncRead + AsyncWrite + Unpin,
{
fn from(src: T) -> Self {
Self::new(src)