//! Streaming bodies for Requests and Responses use std::borrow::Cow; use std::fmt; use bytes::Bytes; use futures::{Async, Future, Poll, Stream}; use futures::sync::{mpsc, oneshot}; use http::HeaderMap; use common::Never; use super::Chunk; type BodySender = mpsc::Sender>; /// This trait represents a streaming body of a `Request` or `Response`. pub trait Payload { /// A buffer of bytes representing a single chunk of a body. type Data: AsRef<[u8]>; /// The error type of this stream. type Error: Into>; /// Poll for a `Data` buffer. /// /// Similar to `Stream::poll_next`, this yields `Some(Data)` until /// the body ends, when it yields `None`. fn poll_data(&mut self) -> Poll, Self::Error>; /// Poll for an optional **single** `HeaderMap` of trailers. /// /// This should **only** be called after `poll_data` has ended. /// /// Note: Trailers aren't currently used for HTTP/1, only for HTTP/2. fn poll_trailers(&mut self) -> Poll, Self::Error> { Ok(Async::Ready(None)) } /// A hint that the `Body` is complete, and doesn't need to be polled more. /// /// This can be useful to determine if the there is any body or trailers /// without having to poll. An empty `Body` could return `true` and hyper /// would be able to know that only the headers need to be sent. Or, it can /// also be checked after each `poll_data` call, to allow hyper to try to /// end the underlying stream with the last chunk, instead of needing to /// send an extra `DATA` frame just to mark the stream as finished. /// /// As a hint, it is used to try to optimize, and thus is OK for a default /// implementation to return `false`. fn is_end_stream(&self) -> bool { false } /// Return a length of the total bytes that will be streamed, if known. /// /// If an exact size of bytes is known, this would allow hyper to send a /// `Content-Length` header automatically, not needing to fall back to /// `Transfer-Encoding: chunked`. /// /// This does not need to be kept updated after polls, it will only be /// called once to create the headers. fn content_length(&self) -> Option { None } } impl Payload for Box { type Data = E::Data; type Error = E::Error; fn poll_data(&mut self) -> Poll, Self::Error> { (**self).poll_data() } fn poll_trailers(&mut self) -> Poll, Self::Error> { (**self).poll_trailers() } fn is_end_stream(&self) -> bool { (**self).is_end_stream() } fn content_length(&self) -> Option { (**self).content_length() } } /// A `Payload` of `Chunk`s, used when receiving bodies. /// /// Also a good default `Payload` to use in many applications. #[must_use = "streams do nothing unless polled"] pub struct Body { kind: Kind, /// Allow the client to pass a future to delay the `Body` from returning /// EOF. This allows the `Client` to try to put the idle connection /// back into the pool before the body is "finished". /// /// The reason for this is so that creating a new request after finishing /// streaming the body of a response could sometimes result in creating /// a brand new connection, since the pool didn't know about the idle /// connection yet. delayed_eof: Option, } enum Kind { Chan { _close_tx: oneshot::Sender<()>, rx: mpsc::Receiver>, }, Wrapped(Box> + Send>), Once(Option), Empty, } type DelayEofUntil = oneshot::Receiver; enum DelayEof { /// Initial state, stream hasn't seen EOF yet. NotEof(DelayEofUntil), /// Transitions to this state once we've seen `poll` try to /// return EOF (`None`). This future is then polled, and /// when it completes, the Body finally returns EOF (`None`). Eof(DelayEofUntil), } /// A sender half used with `Body::channel()`. #[derive(Debug)] pub struct Sender { close_rx: oneshot::Receiver<()>, tx: BodySender, } impl Body { /// Create an empty `Body` stream. /// /// # Example /// /// ``` /// use hyper::{Body, Request}; /// /// // create a `GET /` request /// let get = Request::new(Body::empty()); /// ``` #[inline] pub fn empty() -> Body { Body::new(Kind::Empty) } /// Create a `Body` stream with an associated sender half. #[inline] pub fn channel() -> (Sender, Body) { let (tx, rx) = mpsc::channel(0); let (close_tx, close_rx) = oneshot::channel(); let tx = Sender { close_rx: close_rx, tx: tx, }; let rx = Body::new(Kind::Chan { _close_tx: close_tx, rx: rx, }); (tx, rx) } /// Wrap a futures `Stream` in a box inside `Body`. /// /// # Example /// /// ``` /// # extern crate futures; /// # extern crate hyper; /// # use hyper::Body; /// # fn main() { /// let chunks = vec![ /// "hello", /// " ", /// "world", /// ]; /// /// let stream = futures::stream::iter_ok::<_, ::std::io::Error>(chunks); /// /// let body = Body::wrap_stream(stream); /// # } /// ``` pub fn wrap_stream(stream: S) -> Body where S: Stream + Send + 'static, S::Error: Into>, Chunk: From, { let mapped = stream .map(Chunk::from) .map_err(Into::into); Body::new(Kind::Wrapped(Box::new(mapped))) } /// Returns if this body was constructed via `Body::empty()`. /// /// # Note /// /// This does **not** detect if the body stream may be at the end, or /// if the stream will not yield any chunks, in all cases. For instance, /// a streaming body using `chunked` encoding is not able to tell if /// there are more chunks immediately. #[inline] pub fn is_empty(&self) -> bool { match self.kind { Kind::Empty => true, _ => false, } } fn new(kind: Kind) -> Body { Body { kind: kind, delayed_eof: None, } } pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { self.delayed_eof = Some(DelayEof::NotEof(fut)); } fn poll_eof(&mut self) -> Poll, ::Error> { match self.delayed_eof.take() { Some(DelayEof::NotEof(mut delay)) => { match self.poll_inner() { ok @ Ok(Async::Ready(Some(..))) | ok @ Ok(Async::NotReady) => { self.delayed_eof = Some(DelayEof::NotEof(delay)); ok }, Ok(Async::Ready(None)) => match delay.poll() { Ok(Async::Ready(never)) => match never {}, Ok(Async::NotReady) => { self.delayed_eof = Some(DelayEof::Eof(delay)); Ok(Async::NotReady) }, Err(_done) => { Ok(Async::Ready(None)) }, }, Err(e) => Err(e), } }, Some(DelayEof::Eof(mut delay)) => { match delay.poll() { Ok(Async::Ready(never)) => match never {}, Ok(Async::NotReady) => { self.delayed_eof = Some(DelayEof::Eof(delay)); Ok(Async::NotReady) }, Err(_done) => { Ok(Async::Ready(None)) }, } }, None => self.poll_inner(), } } fn poll_inner(&mut self) -> Poll, ::Error> { match self.kind { Kind::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") { Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))), Async::Ready(Some(Err(err))) => Err(err), Async::Ready(None) => Ok(Async::Ready(None)), Async::NotReady => Ok(Async::NotReady), }, Kind::Wrapped(ref mut s) => s.poll().map_err(::Error::new_body), Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), Kind::Empty => Ok(Async::Ready(None)), } } } impl Default for Body { #[inline] fn default() -> Body { Body::empty() } } impl Payload for Body { type Data = Chunk; type Error = ::Error; fn poll_data(&mut self) -> Poll, Self::Error> { self.poll_eof() } fn is_end_stream(&self) -> bool { match self.kind { Kind::Chan { .. } => false, Kind::Wrapped(..) => false, Kind::Once(ref val) => val.is_none(), Kind::Empty => true } } fn content_length(&self) -> Option { match self.kind { Kind::Chan { .. } => None, Kind::Wrapped(..) => None, Kind::Once(Some(ref val)) => Some(val.len() as u64), Kind::Once(None) => None, Kind::Empty => Some(0) } } } impl Stream for Body { type Item = Chunk; type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { self.poll_data() } } impl fmt::Debug for Body { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Body") .finish() } } impl Sender { /// Check to see if this `Sender` can send more data. pub fn poll_ready(&mut self) -> Poll<(), ()> { match self.close_rx.poll() { Ok(Async::Ready(())) | Err(_) => return Err(()), Ok(Async::NotReady) => (), } self.tx.poll_ready().map_err(|_| ()) } /// Sends data on this channel. /// /// This should be called after `poll_ready` indicated the channel /// could accept another `Chunk`. /// /// Returns `Err(Chunk)` if the channel could not (currently) accept /// another `Chunk`. pub fn send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> { self.tx.try_send(Ok(chunk)) .map_err(|err| err.into_inner().expect("just sent Ok")) } pub(crate) fn send_error(&mut self, err: ::Error) { let _ = self.tx.try_send(Err(err)); } } impl From for Body { #[inline] fn from(chunk: Chunk) -> Body { if chunk.is_empty() { Body::empty() } else { Body::new(Kind::Once(Some(chunk))) } } } impl From for Body { #[inline] fn from (bytes: Bytes) -> Body { Body::from(Chunk::from(bytes)) } } impl From> for Body { #[inline] fn from (vec: Vec) -> Body { Body::from(Chunk::from(vec)) } } impl From<&'static [u8]> for Body { #[inline] fn from (slice: &'static [u8]) -> Body { Body::from(Chunk::from(slice)) } } impl From> for Body { #[inline] fn from (cow: Cow<'static, [u8]>) -> Body { match cow { Cow::Borrowed(b) => Body::from(b), Cow::Owned(o) => Body::from(o) } } } impl From for Body { #[inline] fn from (s: String) -> Body { Body::from(Chunk::from(s.into_bytes())) } } impl From<&'static str> for Body { #[inline] fn from(slice: &'static str) -> Body { Body::from(Chunk::from(slice.as_bytes())) } } impl From> for Body { #[inline] fn from(cow: Cow<'static, str>) -> Body { match cow { Cow::Borrowed(b) => Body::from(b), Cow::Owned(o) => Body::from(o) } } } fn _assert_send_sync() { fn _assert_send() {} fn _assert_sync() {} _assert_send::(); _assert_send::(); _assert_sync::(); } #[test] fn test_body_stream_concat() { use futures::{Stream, Future}; let body = Body::from("hello world"); let total = body .concat2() .wait() .unwrap(); assert_eq!(total.as_ref(), b"hello world"); }