use std::borrow::Cow; use std::fmt; use bytes::Bytes; use futures::sync::{mpsc, oneshot}; use futures::{Async, Future, Poll, Stream}; use h2; use http::HeaderMap; use common::Never; use super::internal::{FullDataArg, FullDataRet}; use super::{Chunk, Payload}; use upgrade::OnUpgrade; type BodySender = mpsc::Sender>; /// A stream of `Chunk`s, used when receiving bodies. /// /// A good default `Payload` to use in many applications. /// /// Also implements `futures::Stream`, so stream combinators may be used. #[must_use = "streams do nothing unless polled"] pub struct Body { kind: Kind, /// Keep the extra bits in an `Option>`, so that /// Body stays small in the common case (no extras needed). extra: Option>, } enum Kind { Once(Option), Chan { content_length: Option, abort_rx: oneshot::Receiver<()>, rx: mpsc::Receiver>, }, H2 { content_length: Option, recv: h2::RecvStream, }, Wrapped(Box> + Send>), } struct Extra { /// Allow the client to pass a future to delay the `Body` from returning /// EOF. This allows the `Client` to try to put the idle connection /// back into the pool before the body is "finished". /// /// The reason for this is so that creating a new request after finishing /// streaming the body of a response could sometimes result in creating /// a brand new connection, since the pool didn't know about the idle /// connection yet. delayed_eof: Option, on_upgrade: OnUpgrade, } type DelayEofUntil = oneshot::Receiver; enum DelayEof { /// Initial state, stream hasn't seen EOF yet. NotEof(DelayEofUntil), /// Transitions to this state once we've seen `poll` try to /// return EOF (`None`). This future is then polled, and /// when it completes, the Body finally returns EOF (`None`). Eof(DelayEofUntil), } /// A sender half used with `Body::channel()`. /// /// Useful when wanting to stream chunks from another thread. See /// [`Body::channel`](Body::channel) for more. #[must_use = "Sender does nothing unless sent on"] #[derive(Debug)] pub struct Sender { abort_tx: oneshot::Sender<()>, tx: BodySender, } impl Body { /// Create an empty `Body` stream. /// /// # Example /// /// ``` /// use hyper::{Body, Request}; /// /// // create a `GET /` request /// let get = Request::new(Body::empty()); /// ``` #[inline] pub fn empty() -> Body { Body::new(Kind::Once(None)) } /// Create a `Body` stream with an associated sender half. /// /// Useful when wanting to stream chunks from another thread. #[inline] pub fn channel() -> (Sender, Body) { Self::new_channel(None) } pub(crate) fn new_channel(content_length: Option) -> (Sender, Body) { let (tx, rx) = mpsc::channel(0); let (abort_tx, abort_rx) = oneshot::channel(); let tx = Sender { abort_tx: abort_tx, tx: tx, }; let rx = Body::new(Kind::Chan { content_length, abort_rx, rx, }); (tx, rx) } /// Wrap a futures `Stream` in a box inside `Body`. /// /// # Example /// /// ``` /// # extern crate futures; /// # extern crate hyper; /// # use hyper::Body; /// # fn main() { /// let chunks = vec![ /// "hello", /// " ", /// "world", /// ]; /// /// let stream = futures::stream::iter_ok::<_, ::std::io::Error>(chunks); /// /// let body = Body::wrap_stream(stream); /// # } /// ``` pub fn wrap_stream(stream: S) -> Body where S: Stream + Send + 'static, S::Error: Into>, Chunk: From, { let mapped = stream.map(Chunk::from).map_err(Into::into); Body::new(Kind::Wrapped(Box::new(mapped))) } /// Converts this `Body` into a `Future` of a pending HTTP upgrade. /// /// See [the `upgrade` module](::upgrade) for more. pub fn on_upgrade(self) -> OnUpgrade { self .extra .map(|ex| ex.on_upgrade) .unwrap_or_else(OnUpgrade::none) } fn new(kind: Kind) -> Body { Body { kind: kind, extra: None, } } pub(crate) fn h2(recv: h2::RecvStream, content_length: Option) -> Self { Body::new(Kind::H2 { content_length, recv, }) } pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) { debug_assert!(!upgrade.is_none(), "set_on_upgrade with empty upgrade"); let extra = self.extra_mut(); debug_assert!(extra.on_upgrade.is_none(), "set_on_upgrade twice"); extra.on_upgrade = upgrade; } pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut)); } fn take_delayed_eof(&mut self) -> Option { self .extra .as_mut() .and_then(|extra| extra.delayed_eof.take()) } fn extra_mut(&mut self) -> &mut Extra { self .extra .get_or_insert_with(|| Box::new(Extra { delayed_eof: None, on_upgrade: OnUpgrade::none(), })) } fn poll_eof(&mut self) -> Poll, ::Error> { match self.take_delayed_eof() { Some(DelayEof::NotEof(mut delay)) => { match self.poll_inner() { ok @ Ok(Async::Ready(Some(..))) | ok @ Ok(Async::NotReady) => { self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay)); ok }, Ok(Async::Ready(None)) => match delay.poll() { Ok(Async::Ready(never)) => match never {}, Ok(Async::NotReady) => { self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); Ok(Async::NotReady) }, Err(_done) => { Ok(Async::Ready(None)) }, }, Err(e) => Err(e), } }, Some(DelayEof::Eof(mut delay)) => { match delay.poll() { Ok(Async::Ready(never)) => match never {}, Ok(Async::NotReady) => { self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); Ok(Async::NotReady) }, Err(_done) => { Ok(Async::Ready(None)) }, } }, None => self.poll_inner(), } } fn poll_inner(&mut self) -> Poll, ::Error> { match self.kind { Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), Kind::Chan { content_length: ref mut len, ref mut rx, ref mut abort_rx, } => { if let Ok(Async::Ready(())) = abort_rx.poll() { return Err(::Error::new_body_write("body write aborted")); } match rx.poll().expect("mpsc cannot error") { Async::Ready(Some(Ok(chunk))) => { if let Some(ref mut len) = *len { debug_assert!(*len >= chunk.len() as u64); *len = *len - chunk.len() as u64; } Ok(Async::Ready(Some(chunk))) } Async::Ready(Some(Err(err))) => Err(err), Async::Ready(None) => Ok(Async::Ready(None)), Async::NotReady => Ok(Async::NotReady), } } Kind::H2 { recv: ref mut h2, .. } => h2 .poll() .map(|async| { async.map(|opt| { opt.map(|bytes| { let _ = h2.release_capacity().release_capacity(bytes.len()); Chunk::from(bytes) }) }) }) .map_err(::Error::new_body), Kind::Wrapped(ref mut s) => s.poll().map_err(::Error::new_body), } } } impl Default for Body { /// Returns [`Body::empty()`](Body::empty). #[inline] fn default() -> Body { Body::empty() } } impl Payload for Body { type Data = Chunk; type Error = ::Error; fn poll_data(&mut self) -> Poll, Self::Error> { self.poll_eof() } fn poll_trailers(&mut self) -> Poll, Self::Error> { match self.kind { Kind::H2 { recv: ref mut h2, .. } => h2.poll_trailers().map_err(::Error::new_h2), _ => Ok(Async::Ready(None)), } } fn is_end_stream(&self) -> bool { match self.kind { Kind::Once(ref val) => val.is_none(), Kind::Chan { content_length, .. } => content_length == Some(0), Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), Kind::Wrapped(..) => false, } } fn content_length(&self) -> Option { match self.kind { Kind::Once(Some(ref val)) => Some(val.len() as u64), Kind::Once(None) => Some(0), Kind::Wrapped(..) => None, Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => content_length, } } // We can improve the performance of `Body` when we know it is a Once kind. #[doc(hidden)] fn __hyper_full_data(&mut self, _: FullDataArg) -> FullDataRet { match self.kind { Kind::Once(ref mut val) => FullDataRet(val.take()), _ => FullDataRet(None), } } } impl Stream for Body { type Item = Chunk; type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { self.poll_data() } } impl fmt::Debug for Body { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Body").finish() } } impl Sender { /// Check to see if this `Sender` can send more data. pub fn poll_ready(&mut self) -> Poll<(), ::Error> { match self.abort_tx.poll_cancel() { Ok(Async::Ready(())) | Err(_) => return Err(::Error::new_closed()), Ok(Async::NotReady) => (), } self.tx.poll_ready().map_err(|_| ::Error::new_closed()) } /// Sends data on this channel. /// /// This should be called after `poll_ready` indicated the channel /// could accept another `Chunk`. /// /// Returns `Err(Chunk)` if the channel could not (currently) accept /// another `Chunk`. pub fn send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> { self.tx .try_send(Ok(chunk)) .map_err(|err| err.into_inner().expect("just sent Ok")) } /// Aborts the body in an abnormal fashion. pub fn abort(self) { let _ = self.abort_tx.send(()); } pub(crate) fn send_error(&mut self, err: ::Error) { let _ = self.tx.try_send(Err(err)); } } impl From for Body { #[inline] fn from(chunk: Chunk) -> Body { if chunk.is_empty() { Body::empty() } else { Body::new(Kind::Once(Some(chunk))) } } } impl From> + Send + 'static>> for Body { #[inline] fn from( stream: Box< Stream> + Send + 'static, >, ) -> Body { Body::new(Kind::Wrapped(stream)) } } impl From for Body { #[inline] fn from(bytes: Bytes) -> Body { Body::from(Chunk::from(bytes)) } } impl From> for Body { #[inline] fn from(vec: Vec) -> Body { Body::from(Chunk::from(vec)) } } impl From<&'static [u8]> for Body { #[inline] fn from(slice: &'static [u8]) -> Body { Body::from(Chunk::from(slice)) } } impl From> for Body { #[inline] fn from(cow: Cow<'static, [u8]>) -> Body { match cow { Cow::Borrowed(b) => Body::from(b), Cow::Owned(o) => Body::from(o), } } } impl From for Body { #[inline] fn from(s: String) -> Body { Body::from(Chunk::from(s.into_bytes())) } } impl From<&'static str> for Body { #[inline] fn from(slice: &'static str) -> Body { Body::from(Chunk::from(slice.as_bytes())) } } impl From> for Body { #[inline] fn from(cow: Cow<'static, str>) -> Body { match cow { Cow::Borrowed(b) => Body::from(b), Cow::Owned(o) => Body::from(o), } } } #[test] fn test_body_stream_concat() { let body = Body::from("hello world"); let total = body.concat2().wait().unwrap(); assert_eq!(total.as_ref(), b"hello world"); }