use std::fmt;
use std::fs::File;
use std::future::Future;
use std::io::{self, Cursor, Read};
use bytes::Bytes;
use crate::async_impl;
/// The body of a `Request`.
///
/// In most cases, this is not needed directly, as the
/// [`RequestBuilder.body`][builder] method uses `Into
`, which allows
/// passing many things (like a string or vector of bytes).
///
/// [builder]: ./struct.RequestBuilder.html#method.body
#[derive(Debug)]
pub struct Body {
    kind: Kind,
}
impl Body {
    /// Instantiate a `Body` from a reader.
    ///
    /// # Note
    ///
    /// While allowing for many types to be used, these bodies do not have
    /// a way to reset to the beginning and be reused. This means that when
    /// encountering a 307 or 308 status code, instead of repeating the
    /// request at the new location, the `Response` will be returned with
    /// the redirect status code set.
    ///
    /// ```rust
    /// # use std::fs::File;
    /// # use reqwest::blocking::Body;
    /// # fn run() -> Result<(), Box> {
    /// let file = File::open("national_secrets.txt")?;
    /// let body = Body::new(file);
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// If you have a set of bytes, like `String` or `Vec`, using the
    /// `From` implementations for `Body` will store the data in a manner
    /// it can be reused.
    ///
    /// ```rust
    /// # use reqwest::blocking::Body;
    /// # fn run() -> Result<(), Box> {
    /// let s = "A stringy body";
    /// let body = Body::from(s);
    /// # Ok(())
    /// # }
    /// ```
    pub fn new(reader: R) -> Body {
        Body {
            kind: Kind::Reader(Box::from(reader), None),
        }
    }
    /// Create a `Body` from a `Read` where the size is known in advance
    /// but the data should not be fully loaded into memory. This will
    /// set the `Content-Length` header and stream from the `Read`.
    ///
    /// ```rust
    /// # use std::fs::File;
    /// # use reqwest::blocking::Body;
    /// # fn run() -> Result<(), Box> {
    /// let file = File::open("a_large_file.txt")?;
    /// let file_size = file.metadata()?.len();
    /// let body = Body::sized(file, file_size);
    /// # Ok(())
    /// # }
    /// ```
    pub fn sized(reader: R, len: u64) -> Body {
        Body {
            kind: Kind::Reader(Box::from(reader), Some(len)),
        }
    }
    pub(crate) fn len(&self) -> Option {
        match self.kind {
            Kind::Reader(_, len) => len,
            Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
        }
    }
    pub(crate) fn into_reader(self) -> Reader {
        match self.kind {
            Kind::Reader(r, _) => Reader::Reader(r),
            Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
        }
    }
    pub(crate) fn into_async(self) -> (Option, async_impl::Body, Option) {
        match self.kind {
            Kind::Reader(read, len) => {
                let (tx, rx) = hyper::Body::channel();
                let tx = Sender {
                    body: (read, len),
                    tx,
                };
                (Some(tx), async_impl::Body::wrap(rx), len)
            }
            Kind::Bytes(chunk) => {
                let len = chunk.len() as u64;
                (None, async_impl::Body::reusable(chunk), Some(len))
            }
        }
    }
    pub(crate) fn try_clone(&self) -> Option {
        self.kind.try_clone().map(|kind| Body { kind })
    }
}
enum Kind {
    Reader(Box, Option),
    Bytes(Bytes),
}
impl Kind {
    fn try_clone(&self) -> Option {
        match self {
            Kind::Reader(..) => None,
            Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
        }
    }
}
impl From> for Body {
    #[inline]
    fn from(v: Vec) -> Body {
        Body {
            kind: Kind::Bytes(v.into()),
        }
    }
}
impl From for Body {
    #[inline]
    fn from(s: String) -> Body {
        s.into_bytes().into()
    }
}
impl From<&'static [u8]> for Body {
    #[inline]
    fn from(s: &'static [u8]) -> Body {
        Body {
            kind: Kind::Bytes(Bytes::from_static(s)),
        }
    }
}
impl From<&'static str> for Body {
    #[inline]
    fn from(s: &'static str) -> Body {
        s.as_bytes().into()
    }
}
impl From for Body {
    #[inline]
    fn from(f: File) -> Body {
        let len = f.metadata().map(|m| m.len()).ok();
        Body {
            kind: Kind::Reader(Box::new(f), len),
        }
    }
}
impl fmt::Debug for Kind {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            Kind::Reader(_, ref v) => f
                .debug_struct("Reader")
                .field("length", &DebugLength(v))
                .finish(),
            Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
        }
    }
}
struct DebugLength<'a>(&'a Option);
impl<'a> fmt::Debug for DebugLength<'a> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self.0 {
            Some(ref len) => fmt::Debug::fmt(len, f),
            None => f.write_str("Unknown"),
        }
    }
}
pub(crate) enum Reader {
    Reader(Box),
    Bytes(Cursor),
}
impl Read for Reader {
    fn read(&mut self, buf: &mut [u8]) -> io::Result {
        match *self {
            Reader::Reader(ref mut rdr) => rdr.read(buf),
            Reader::Bytes(ref mut rdr) => rdr.read(buf),
        }
    }
}
pub(crate) struct Sender {
    body: (Box, Option),
    tx: hyper::body::Sender,
}
async fn send_future(sender: Sender) -> Result<(), crate::Error> {
    use bytes::{BufMut, BytesMut};
    use std::cmp;
    let con_len = sender.body.1;
    let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
    let mut written = 0;
    let mut buf = BytesMut::with_capacity(cap as usize);
    let mut body = sender.body.0;
    // Put in an option so that it can be consumed on error to call abort()
    let mut tx = Some(sender.tx);
    loop {
        if Some(written) == con_len {
            // Written up to content-length, so stop.
            return Ok(());
        }
        // The input stream is read only if the buffer is empty so
        // that there is only one read in the buffer at any time.
        //
        // We need to know whether there is any data to send before
        // we check the transmission channel (with poll_ready below)
        // because somestimes the receiver disappears as soon as is
        // considers the data is completely transmitted, which may
        // be true.
        //
        // The use case is a web server that closes its
        // input stream as soon as the data received is valid JSON.
        // This behaviour is questionable, but it exists and the
        // fact is that there is actually no remaining data to read.
        if buf.is_empty() {
            if buf.remaining_mut() == 0 {
                buf.reserve(8192);
            }
            match body.read(unsafe { buf.bytes_mut() }) {
                Ok(0) => {
                    // The buffer was empty and nothing's left to
                    // read. Return.
                    return Ok(());
                }
                Ok(n) => unsafe {
                    buf.advance_mut(n);
                },
                Err(e) => {
                    tx.take().expect("tx only taken on error").abort();
                    return Err(crate::error::body(e));
                }
            }
        }
        // The only way to get here is when the buffer is not empty.
        // We can check the transmission channel
        let buf_len = buf.len() as u64;
        tx.as_mut()
            .expect("tx only taken on error")
            .send_data(buf.take().freeze().into())
            .await
            .map_err(crate::error::body)?;
        written += buf_len;
    }
}
impl Sender {
    // A `Future` that may do blocking read calls.
    // As a `Future`, this integrates easily with `wait::timeout`.
    pub(crate) fn send(self) -> impl Future