fix(lib): remove deprecated tokio-proto APIs
BREAKING CHANGE: Many of these APIs have been deprecated for a while, check the documentation for the recommended way to use hyper now.
This commit is contained in:
@@ -3,14 +3,10 @@ use std::fmt;
|
||||
use bytes::Bytes;
|
||||
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
|
||||
use futures::sync::{mpsc, oneshot};
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
use tokio_proto;
|
||||
use std::borrow::Cow;
|
||||
|
||||
use super::Chunk;
|
||||
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
pub type TokioBody = tokio_proto::streaming::Body<Chunk, ::Error>;
|
||||
pub type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
|
||||
|
||||
/// A `Stream` for `Chunk`s used in requests and responses.
|
||||
@@ -21,8 +17,6 @@ pub struct Body {
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Kind {
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
Tokio(TokioBody),
|
||||
Chan {
|
||||
close_tx: oneshot::Sender<bool>,
|
||||
rx: mpsc::Receiver<Result<Chunk, ::Error>>,
|
||||
@@ -77,8 +71,6 @@ impl Body {
|
||||
|
||||
fn poll_inner(&mut self) -> Poll<Option<Chunk>, ::Error> {
|
||||
match self.kind {
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
Kind::Tokio(ref mut rx) => rx.poll(),
|
||||
Kind::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") {
|
||||
Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))),
|
||||
Async::Ready(Some(Err(err))) => Err(err),
|
||||
@@ -160,42 +152,6 @@ impl ChunkSender {
|
||||
}
|
||||
}
|
||||
|
||||
feat_server_proto! {
|
||||
impl From<Body> for tokio_proto::streaming::Body<Chunk, ::Error> {
|
||||
fn from(b: Body) -> tokio_proto::streaming::Body<Chunk, ::Error> {
|
||||
match b.kind {
|
||||
Kind::Tokio(b) => b,
|
||||
Kind::Chan { close_tx, rx } => {
|
||||
// disable knowing if the Rx gets dropped, since we cannot
|
||||
// pass this tx along.
|
||||
let _ = close_tx.send(false);
|
||||
rx.into()
|
||||
},
|
||||
Kind::Once(Some(chunk)) => TokioBody::from(chunk),
|
||||
Kind::Once(None) |
|
||||
Kind::Empty => TokioBody::empty(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio_proto::streaming::Body<Chunk, ::Error>> for Body {
|
||||
fn from(tokio_body: tokio_proto::streaming::Body<Chunk, ::Error>) -> Body {
|
||||
Body::new(Kind::Tokio(tokio_body))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<mpsc::Receiver<Result<Chunk, ::Error>>> for Body {
|
||||
#[inline]
|
||||
fn from(src: mpsc::Receiver<Result<Chunk, ::Error>>) -> Body {
|
||||
let (tx, _) = oneshot::channel();
|
||||
Body::new(Kind::Chan {
|
||||
close_tx: tx,
|
||||
rx: src,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Chunk> for Body {
|
||||
#[inline]
|
||||
fn from (chunk: Chunk) -> Body {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::fmt;
|
||||
//use std::mem;
|
||||
|
||||
use bytes::Bytes;
|
||||
|
||||
|
||||
@@ -4,12 +4,8 @@ use std::marker::PhantomData;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{Async, AsyncSink, Poll, StartSend};
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
use futures::{Sink, Stream};
|
||||
use futures::task::Task;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
use tokio_proto::streaming::pipeline::{Frame, Transport};
|
||||
|
||||
use proto::{Chunk, Decode, Http1Transaction, MessageHead};
|
||||
use super::io::{Cursor, Buffered};
|
||||
@@ -81,70 +77,6 @@ where I: AsyncRead + AsyncWrite,
|
||||
self.io.into_inner()
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
fn poll_incoming(&mut self) -> Poll<Option<Frame<MessageHead<T::Incoming>, Chunk, ::Error>>, io::Error> {
|
||||
trace!("Conn::poll_incoming()");
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ParseEof;
|
||||
|
||||
impl fmt::Display for ParseEof {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.write_str(::std::error::Error::description(self))
|
||||
}
|
||||
}
|
||||
|
||||
impl ::std::error::Error for ParseEof {
|
||||
fn description(&self) -> &str {
|
||||
"end of file reached before parsing could complete"
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
if self.is_read_closed() {
|
||||
trace!("Conn::poll when closed");
|
||||
return Ok(Async::Ready(None));
|
||||
} else if self.can_read_head() {
|
||||
return match self.read_head() {
|
||||
Ok(Async::Ready(Some((head, body)))) => {
|
||||
Ok(Async::Ready(Some(Frame::Message {
|
||||
message: head,
|
||||
body: body,
|
||||
})))
|
||||
},
|
||||
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Err(::Error::Io(err)) => Err(err),
|
||||
Err(::Error::Incomplete) => {
|
||||
Err(io::Error::new(io::ErrorKind::UnexpectedEof, ParseEof))
|
||||
},
|
||||
Err(err) => Ok(Async::Ready(Some(Frame::Error {
|
||||
error: err,
|
||||
}))),
|
||||
};
|
||||
} else if self.can_read_body() {
|
||||
return self.read_body()
|
||||
.map(|async| async.map(|chunk| Some(Frame::Body {
|
||||
chunk: chunk
|
||||
})))
|
||||
.or_else(|err| {
|
||||
self.state.close_read();
|
||||
Ok(Async::Ready(Some(Frame::Error { error: err.into() })))
|
||||
});
|
||||
} else {
|
||||
trace!("poll when on keep-alive");
|
||||
if !T::should_read_first() {
|
||||
self.require_empty_read()?;
|
||||
if self.is_read_closed() {
|
||||
return Ok(Async::Ready(None));
|
||||
}
|
||||
}
|
||||
self.maybe_park_read();
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_read_closed(&self) -> bool {
|
||||
self.state.is_read_closed()
|
||||
}
|
||||
@@ -667,101 +599,6 @@ where I: AsyncRead + AsyncWrite,
|
||||
}
|
||||
}
|
||||
|
||||
// ==== tokio_proto impl ====
|
||||
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
impl<I, B, T> Stream for Conn<I, B, T>
|
||||
where I: AsyncRead + AsyncWrite,
|
||||
B: AsRef<[u8]>,
|
||||
T: Http1Transaction,
|
||||
T::Outgoing: fmt::Debug {
|
||||
type Item = Frame<MessageHead<T::Incoming>, Chunk, ::Error>;
|
||||
type Error = io::Error;
|
||||
|
||||
#[inline]
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
self.poll_incoming().map_err(|err| {
|
||||
debug!("poll error: {}", err);
|
||||
err
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
impl<I, B, T> Sink for Conn<I, B, T>
|
||||
where I: AsyncRead + AsyncWrite,
|
||||
B: AsRef<[u8]>,
|
||||
T: Http1Transaction,
|
||||
T::Outgoing: fmt::Debug {
|
||||
type SinkItem = Frame<MessageHead<T::Outgoing>, B, ::Error>;
|
||||
type SinkError = io::Error;
|
||||
|
||||
#[inline]
|
||||
fn start_send(&mut self, frame: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
|
||||
trace!("Conn::start_send( frame={:?} )", DebugFrame(&frame));
|
||||
|
||||
let frame: Self::SinkItem = match frame {
|
||||
Frame::Message { message: head, body } => {
|
||||
if self.can_write_head() {
|
||||
self.write_head(head, body);
|
||||
return Ok(AsyncSink::Ready);
|
||||
} else {
|
||||
Frame::Message { message: head, body: body }
|
||||
}
|
||||
},
|
||||
Frame::Body { chunk } => {
|
||||
if self.can_write_body() {
|
||||
return self.write_body(chunk)
|
||||
.map(|async| {
|
||||
match async {
|
||||
AsyncSink::Ready => AsyncSink::Ready,
|
||||
AsyncSink::NotReady(chunk) => AsyncSink::NotReady(Frame::Body {
|
||||
chunk: chunk,
|
||||
})
|
||||
}
|
||||
});
|
||||
// This allows when chunk is `None`, or `Some([])`.
|
||||
} else if chunk.as_ref().map(|c| c.as_ref().len()).unwrap_or(0) == 0 {
|
||||
return Ok(AsyncSink::Ready);
|
||||
} else {
|
||||
Frame::Body { chunk: chunk }
|
||||
}
|
||||
},
|
||||
Frame::Error { error } => {
|
||||
debug!("received error, closing: {:?}", error);
|
||||
self.state.close();
|
||||
return Ok(AsyncSink::Ready);
|
||||
},
|
||||
};
|
||||
|
||||
warn!("writing illegal frame; state={:?}, frame={:?}", self.state.writing, DebugFrame(&frame));
|
||||
Err(io::Error::new(io::ErrorKind::InvalidInput, "illegal frame"))
|
||||
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
|
||||
trace!("Conn::poll_complete()");
|
||||
self.flush().map_err(|err| {
|
||||
debug!("error writing: {}", err);
|
||||
err
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn close(&mut self) -> Poll<(), Self::SinkError> {
|
||||
try_ready!(self.flush());
|
||||
self.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
impl<I, B, T> Transport for Conn<I, B, T>
|
||||
where I: AsyncRead + AsyncWrite + 'static,
|
||||
B: AsRef<[u8]> + 'static,
|
||||
T: Http1Transaction + 'static,
|
||||
T::Outgoing: fmt::Debug {}
|
||||
|
||||
impl<I, B: AsRef<[u8]>, T> fmt::Debug for Conn<I, B, T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("Conn")
|
||||
@@ -958,46 +795,12 @@ enum Version {
|
||||
Http11,
|
||||
}
|
||||
|
||||
// The DebugFrame and DebugChunk are simple Debug implementations that allow
|
||||
// us to dump the frame into logs, without logging the entirety of the bytes.
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
struct DebugFrame<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a>(&'a Frame<MessageHead<T>, B, ::Error>);
|
||||
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
impl<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a> fmt::Debug for DebugFrame<'a, T, B> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self.0 {
|
||||
Frame::Message { ref body, .. } => {
|
||||
f.debug_struct("Message")
|
||||
.field("body", body)
|
||||
.finish()
|
||||
},
|
||||
Frame::Body { chunk: Some(ref chunk) } => {
|
||||
f.debug_struct("Body")
|
||||
.field("bytes", &chunk.as_ref().len())
|
||||
.finish()
|
||||
},
|
||||
Frame::Body { chunk: None } => {
|
||||
f.debug_struct("Body")
|
||||
.field("bytes", &None::<()>)
|
||||
.finish()
|
||||
},
|
||||
Frame::Error { ref error } => {
|
||||
f.debug_struct("Error")
|
||||
.field("error", error)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
//TODO: rewrite these using dispatch instead of tokio-proto API
|
||||
//TODO: rewrite these using dispatch
|
||||
mod tests {
|
||||
/*
|
||||
use futures::{Async, Future, Stream, Sink};
|
||||
use futures::future;
|
||||
use tokio_proto::streaming::pipeline::Frame;
|
||||
|
||||
use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
|
||||
use super::super::Encoder;
|
||||
@@ -1326,4 +1129,5 @@ mod tests {
|
||||
assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
|
||||
conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
@@ -13,8 +13,6 @@ use version::HttpVersion;
|
||||
use version::HttpVersion::{Http10, Http11};
|
||||
|
||||
pub use self::body::Body;
|
||||
#[cfg(feature = "tokio-proto")]
|
||||
pub use self::body::TokioBody;
|
||||
pub use self::chunk::Chunk;
|
||||
pub use self::h1::{dispatch, Conn};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user