feat(body): introduce an Entity trait to represent bodies

This dedicated `Entity` trait replaces the previous `Stream<Item=impl
AsRef<[u8]>, Error=hyper::Error>`. This allows for several improvements
immediately, and prepares for HTTP2 support.

- The `Entity::is_end_stream` makes up for change away from
  `Option<Body>`, which was previously used to know if the body should be
  empty. Since `Request` and `Response` now require a body to be set,
  this method can be used to tell hyper that the body is actually empty.

  It also provides the possibility of slight optimizations when polling
  for data, by allowing to check `is_end_stream` before polling again.
  This can allow a consumer to know that a body stream has ended without
  polling for `None` afterwards.

- The `Entity::content_length` method allows a body to automatically
  declare a size, in case a user doesn't set a `Content-Length` or
  `Transfer-Encoding` header.

- It's now possible to send and receive trailers, though this will be
  for HTTP2 connections only.

By being a trait owned by hyper, new methods can be added later as new
features are wanted (with default implementations).

The `hyper::Body` type now implements `Entity` instead of `Stream`,
provides a better channel option, and is easier to use with custom
streams via `Body::wrap_stream`.

BREAKING CHANGE: All code that was assuming the body was a `Stream` must
  be adjusted to use an `Entity` instead.

  Using `hyper::Body` as a `Stream` can call `Body::into_stream`
  to get a stream wrapper.

  Passing a custom `impl Stream` will need to either implement
  `Entity`, or as an easier option, switch to `Body::wrap_stream`.

  `Body::pair` has been replaced with `Body::channel`, which returns a
  `hyper::body::Sender` instead of a `futures::sync::mpsc::Sender`.

Closes #1438
This commit is contained in:
Sean McArthur
2018-03-14 12:40:24 -07:00
parent 3cd48b45fb
commit fbc449e49c
18 changed files with 811 additions and 485 deletions

View File

@@ -1,50 +1,236 @@
//! Streaming bodies for Requests and Responses
use std::borrow::Cow;
use std::fmt;
use bytes::Bytes;
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
use futures::{Async, Future, Poll, Stream};
use futures::sync::{mpsc, oneshot};
use std::borrow::Cow;
use http::HeaderMap;
use super::Chunk;
pub type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
/// A `Stream` for `Chunk`s used in requests and responses.
/// This trait represents a streaming body of a `Request` or `Response`.
pub trait Entity {
/// A buffer of bytes representing a single chunk of a body.
type Data: AsRef<[u8]>;
/// The error type of this stream.
//TODO: add bounds Into<::error::User> (or whatever it is called)
type Error;
/// Poll for a `Data` buffer.
///
/// Similar to `Stream::poll_next`, this yields `Some(Data)` until
/// the body ends, when it yields `None`.
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error>;
/// Poll for an optional **single** `HeaderMap` of trailers.
///
/// This should **only** be called after `poll_data` has ended.
///
/// Note: Trailers aren't currently used for HTTP/1, only for HTTP/2.
fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, Self::Error> {
Ok(Async::Ready(None))
}
/// A hint that the `Body` is complete, and doesn't need to be polled more.
///
/// This can be useful to determine if the there is any body or trailers
/// without having to poll. An empty `Body` could return `true` and hyper
/// would be able to know that only the headers need to be sent. Or, it can
/// also be checked after each `poll_data` call, to allow hyper to try to
/// end the underlying stream with the last chunk, instead of needing to
/// send an extra `DATA` frame just to mark the stream as finished.
///
/// As a hint, it is used to try to optimize, and thus is OK for a default
/// implementation to return `false`.
fn is_end_stream(&self) -> bool {
false
}
/// Return a length of the total bytes that will be streamed, if known.
///
/// If an exact size of bytes is known, this would allow hyper to send a
/// `Content-Length` header automatically, not needing to fall back to
/// `Transfer-Encoding: chunked`.
///
/// This does not need to be kept updated after polls, it will only be
/// called once to create the headers.
fn content_length(&self) -> Option<u64> {
None
}
}
impl<E: Entity> Entity for Box<E> {
type Data = E::Data;
type Error = E::Error;
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
(**self).poll_data()
}
fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, Self::Error> {
(**self).poll_trailers()
}
fn is_end_stream(&self) -> bool {
(**self).is_end_stream()
}
fn content_length(&self) -> Option<u64> {
(**self).content_length()
}
}
/// A wrapper to consume an `Entity` as a futures `Stream`.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct EntityStream<E> {
is_data_eof: bool,
entity: E,
}
impl<E: Entity> Stream for EntityStream<E> {
type Item = E::Data;
type Error = E::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
if self.is_data_eof {
return self.entity.poll_trailers()
.map(|async| {
async.map(|_opt| {
// drop the trailers and return that Stream is done
None
})
});
}
let opt = try_ready!(self.entity.poll_data());
if let Some(data) = opt {
return Ok(Async::Ready(Some(data)));
} else {
self.is_data_eof = true;
}
}
}
}
/// An `Entity` of `Chunk`s, used when receiving bodies.
///
/// Also a good default `Entity` to use in many applications.
#[must_use = "streams do nothing unless polled"]
pub struct Body {
kind: Kind,
}
#[derive(Debug)]
enum Kind {
Chan {
close_tx: oneshot::Sender<bool>,
_close_tx: oneshot::Sender<()>,
rx: mpsc::Receiver<Result<Chunk, ::Error>>,
},
Wrapped(Box<Stream<Item=Chunk, Error=::Error> + Send>),
Once(Option<Chunk>),
Empty,
}
//pub(crate)
/// A sender half used with `Body::channel()`.
#[derive(Debug)]
pub struct ChunkSender {
close_rx: oneshot::Receiver<bool>,
close_rx_check: bool,
pub struct Sender {
close_rx: oneshot::Receiver<()>,
tx: BodySender,
}
impl Body {
/// Return an empty body stream
/// Create an empty `Body` stream.
///
/// # Example
///
/// ```
/// use hyper::{Body, Request};
///
/// // create a `GET /` request
/// let get = Request::new(Body::empty());
/// ```
#[inline]
pub fn empty() -> Body {
Body::new(Kind::Empty)
}
/// Return a body stream with an associated sender half
/// Create a `Body` stream with an associated sender half.
#[inline]
pub fn pair() -> (mpsc::Sender<Result<Chunk, ::Error>>, Body) {
let (tx, rx) = channel();
(tx.tx, rx)
pub fn channel() -> (Sender, Body) {
let (tx, rx) = mpsc::channel(0);
let (close_tx, close_rx) = oneshot::channel();
let tx = Sender {
close_rx: close_rx,
tx: tx,
};
let rx = Body::new(Kind::Chan {
_close_tx: close_tx,
rx: rx,
});
(tx, rx)
}
/// Wrap a futures `Stream` in a box inside `Body`.
///
/// # Example
///
/// ```
/// # extern crate futures;
/// # extern crate hyper;
/// # use hyper::Body;
/// # fn main() {
/// let chunks = vec![
/// "hello",
/// " ",
/// "world",
/// ];
/// let stream = futures::stream::iter_ok(chunks);
///
/// let body = Body::wrap_stream(stream);
/// # }
/// ```
pub fn wrap_stream<S>(stream: S) -> Body
where
S: Stream<Error=::Error> + Send + 'static,
Chunk: From<S::Item>,
{
Body::new(Kind::Wrapped(Box::new(stream.map(Chunk::from))))
}
/// Convert this `Body` into a `Stream<Item=Chunk, Error=hyper::Error>`.
///
/// # Example
///
/// ```
/// # extern crate futures;
/// # extern crate hyper;
/// # use futures::{Future, Stream};
/// # use hyper::{Body, Request};
/// # fn request_concat(some_req: Request<Body>) {
/// let req: Request<Body> = some_req;
/// let body = req.into_body();
///
/// let stream = body.into_stream();
/// stream.concat2()
/// .map(|buf| {
/// println!("body length: {}", buf.len());
/// });
/// # }
/// # fn main() {}
/// ```
#[inline]
pub fn into_stream(self) -> EntityStream<Body> {
EntityStream {
is_data_eof: false,
entity: self,
}
}
/// Returns if this body was constructed via `Body::empty()`.
@@ -68,19 +254,6 @@ impl Body {
kind: kind,
}
}
fn poll_inner(&mut self) -> Poll<Option<Chunk>, ::Error> {
match self.kind {
Kind::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") {
Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))),
Async::Ready(Some(Err(err))) => Err(err),
Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
},
Kind::Once(ref mut val) => Ok(Async::Ready(val.take())),
Kind::Empty => Ok(Async::Ready(None)),
}
}
}
impl Default for Body {
@@ -90,72 +263,87 @@ impl Default for Body {
}
}
impl Stream for Body {
type Item = Chunk;
impl Entity for Body {
type Data = Chunk;
type Error = ::Error;
#[inline]
fn poll(&mut self) -> Poll<Option<Chunk>, ::Error> {
self.poll_inner()
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
match self.kind {
Kind::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") {
Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))),
Async::Ready(Some(Err(err))) => Err(err),
Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
},
Kind::Wrapped(ref mut s) => s.poll(),
Kind::Once(ref mut val) => Ok(Async::Ready(val.take())),
Kind::Empty => Ok(Async::Ready(None)),
}
}
fn is_end_stream(&self) -> bool {
match self.kind {
Kind::Chan { .. } => false,
Kind::Wrapped(..) => false,
Kind::Once(ref val) => val.is_none(),
Kind::Empty => true
}
}
fn content_length(&self) -> Option<u64> {
match self.kind {
Kind::Chan { .. } => None,
Kind::Wrapped(..) => None,
Kind::Once(Some(ref val)) => Some(val.len() as u64),
Kind::Once(None) => None,
Kind::Empty => Some(0)
}
}
}
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("Body")
.field(&self.kind)
f.debug_struct("Body")
.finish()
}
}
//pub(crate)
pub fn channel() -> (ChunkSender, Body) {
let (tx, rx) = mpsc::channel(0);
let (close_tx, close_rx) = oneshot::channel();
let tx = ChunkSender {
close_rx: close_rx,
close_rx_check: true,
tx: tx,
};
let rx = Body::new(Kind::Chan {
close_tx: close_tx,
rx: rx,
});
(tx, rx)
}
impl ChunkSender {
impl Sender {
/// Check to see if this `Sender` can send more data.
pub fn poll_ready(&mut self) -> Poll<(), ()> {
if self.close_rx_check {
match self.close_rx.poll() {
Ok(Async::Ready(true)) | Err(_) => return Err(()),
Ok(Async::Ready(false)) => {
// needed to allow converting into a plain mpsc::Receiver
// if it has been, the tx will send false to disable this check
self.close_rx_check = false;
}
Ok(Async::NotReady) => (),
}
match self.close_rx.poll() {
Ok(Async::Ready(())) | Err(_) => return Err(()),
Ok(Async::NotReady) => (),
}
self.tx.poll_ready().map_err(|_| ())
}
pub fn start_send(&mut self, msg: Result<Chunk, ::Error>) -> StartSend<(), ()> {
match self.tx.start_send(msg) {
Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(())),
Err(_) => Err(()),
}
/// Sends data on this channel.
///
/// This should be called after `poll_ready` indicated the channel
/// could accept another `Chunk`.
///
/// Returns `Err(Chunk)` if the channel could not (currently) accept
/// another `Chunk`.
pub fn send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> {
self.tx.try_send(Ok(chunk))
.map_err(|err| err.into_inner().expect("just sent Ok"))
}
pub(crate) fn send_error(&mut self, err: ::Error) {
let _ = self.tx.try_send(Err(err));
}
}
impl From<Chunk> for Body {
#[inline]
fn from (chunk: Chunk) -> Body {
Body::new(Kind::Once(Some(chunk)))
fn from(chunk: Chunk) -> Body {
if chunk.is_empty() {
Body::empty()
} else {
Body::new(Kind::Once(Some(chunk)))
}
}
}
@@ -214,13 +402,6 @@ impl From<Cow<'static, str>> for Body {
}
}
impl From<Option<Body>> for Body {
#[inline]
fn from (body: Option<Body>) -> Body {
body.unwrap_or_default()
}
}
fn _assert_send_sync() {
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
@@ -232,15 +413,14 @@ fn _assert_send_sync() {
#[test]
fn test_body_stream_concat() {
use futures::{Sink, Stream, Future};
let (tx, body) = Body::pair();
use futures::{Stream, Future};
::std::thread::spawn(move || {
let tx = tx.send(Ok("hello ".into())).wait().unwrap();
tx.send(Ok("world".into())).wait().unwrap();
});
let body = Body::from("hello world");
let total = body.concat2().wait().unwrap();
let total = body.into_stream()
.concat2()
.wait()
.unwrap();
assert_eq!(total.as_ref(), b"hello world");
}

View File

@@ -8,7 +8,7 @@ use futures::task::Task;
use http::{Method, Version};
use tokio_io::{AsyncRead, AsyncWrite};
use proto::{Chunk, Decode, Http1Transaction, MessageHead};
use proto::{BodyLength, Chunk, Decode, Http1Transaction, MessageHead};
use super::io::{Cursor, Buffered};
use super::{EncodedBuf, Encoder, Decoder};
@@ -418,7 +418,7 @@ where I: AsyncRead + AsyncWrite,
self.io.can_buffer()
}
pub fn write_head(&mut self, mut head: MessageHead<T::Outgoing>, body: bool) {
pub fn write_head(&mut self, mut head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
debug_assert!(self.can_write_head());
if !T::should_read_first() {
@@ -541,7 +541,7 @@ where I: AsyncRead + AsyncWrite,
match self.state.writing {
Writing::Init => {
if let Some(msg) = T::on_error(&err) {
self.write_head(msg, false);
self.write_head(msg, None);
self.state.error = Some(err);
return Ok(());
}

View File

@@ -1,17 +1,18 @@
use std::io;
use bytes::Bytes;
use futures::{Async, AsyncSink, Future, Poll, Stream};
use futures::{Async, Future, Poll, Stream};
use http::{Request, Response, StatusCode};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_service::Service;
use proto::{Body, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead};
use proto::body::Entity;
use proto::{Body, BodyLength, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead};
pub struct Dispatcher<D, Bs, I, B, T> {
conn: Conn<I, B, T>,
dispatch: D,
body_tx: Option<::proto::body::ChunkSender>,
body_tx: Option<::proto::body::Sender>,
body_rx: Option<Bs>,
is_closing: bool,
}
@@ -46,7 +47,7 @@ where
I: AsyncRead + AsyncWrite,
B: AsRef<[u8]>,
T: Http1Transaction,
Bs: Stream<Item=B, Error=::Error>,
Bs: Entity<Data=B, Error=::Error>,
{
pub fn new(dispatch: D, conn: Conn<I, B, T>) -> Self {
Dispatcher {
@@ -130,13 +131,10 @@ where
}
match self.conn.read_body() {
Ok(Async::Ready(Some(chunk))) => {
match body.start_send(Ok(chunk)) {
Ok(AsyncSink::Ready) => {
match body.send_data(chunk) {
Ok(()) => {
self.body_tx = Some(body);
},
Ok(AsyncSink::NotReady(_chunk)) => {
unreachable!("mpsc poll_ready was ready, start_send was not");
}
Err(_canceled) => {
if self.conn.can_read_body() {
trace!("body receiver dropped before eof, closing");
@@ -154,7 +152,7 @@ where
return Ok(Async::NotReady);
}
Err(e) => {
let _ = body.start_send(Err(::Error::Io(e)));
body.send_error(::Error::Io(e));
}
}
} else {
@@ -181,7 +179,7 @@ where
match self.conn.read_head() {
Ok(Async::Ready(Some((head, has_body)))) => {
let body = if has_body {
let (mut tx, rx) = ::proto::body::channel();
let (mut tx, rx) = Body::channel();
let _ = tx.poll_ready(); // register this task if rx is dropped
self.body_tx = Some(tx);
rx
@@ -213,7 +211,12 @@ where
return Ok(Async::Ready(()));
} else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() {
if let Some((head, body)) = try_ready!(self.dispatch.poll_msg()) {
self.conn.write_head(head, body.is_some());
let body_type = body.as_ref().map(|body| {
body.content_length()
.map(BodyLength::Known)
.unwrap_or(BodyLength::Unknown)
});
self.conn.write_head(head, body_type);
self.body_rx = body;
} else {
self.close();
@@ -222,7 +225,7 @@ where
} else if !self.conn.can_buffer_body() {
try_ready!(self.poll_flush());
} else if let Some(mut body) = self.body_rx.take() {
let chunk = match body.poll()? {
let chunk = match body.poll_data()? {
Async::Ready(Some(chunk)) => {
self.body_rx = Some(body);
chunk
@@ -291,7 +294,7 @@ where
I: AsyncRead + AsyncWrite,
B: AsRef<[u8]>,
T: Http1Transaction,
Bs: Stream<Item=B, Error=::Error>,
Bs: Entity<Data=B, Error=::Error>,
{
type Item = ();
type Error = ::Error;
@@ -316,8 +319,7 @@ impl<S> Server<S> where S: Service {
impl<S, Bs> Dispatch for Server<S>
where
S: Service<Request=Request<Body>, Response=Response<Bs>, Error=::Error>,
Bs: Stream<Error=::Error>,
Bs::Item: AsRef<[u8]>,
Bs: Entity<Error=::Error>,
{
type PollItem = MessageHead<StatusCode>;
type PollBody = Bs;
@@ -338,7 +340,12 @@ where
subject: parts.status,
headers: parts.headers,
};
Ok(Async::Ready(Some((head, Some(body)))))
let body = if body.is_end_stream() {
None
} else {
Some(body)
};
Ok(Async::Ready(Some((head, body))))
} else {
unreachable!("poll_msg shouldn't be called if no inflight");
}
@@ -382,8 +389,7 @@ impl<B> Client<B> {
impl<B> Dispatch for Client<B>
where
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
B: Entity<Error=::Error>,
{
type PollItem = RequestHead;
type PollBody = B;
@@ -405,8 +411,14 @@ where
subject: RequestLine(parts.method, parts.uri),
headers: parts.headers,
};
let body = if body.is_end_stream() {
None
} else {
Some(body)
};
self.callback = Some(cb);
Ok(Async::Ready(Some((head, Some(body)))))
Ok(Async::Ready(Some((head, body))))
}
}
},

View File

@@ -1,12 +1,12 @@
use std::fmt::{self, Write};
use bytes::{BytesMut, Bytes};
use http::header::{CONTENT_LENGTH, DATE, HeaderName, HeaderValue, TRANSFER_ENCODING};
use http::header::{CONTENT_LENGTH, DATE, Entry, HeaderName, HeaderValue, TRANSFER_ENCODING};
use http::{HeaderMap, Method, StatusCode, Uri, Version};
use httparse;
use headers;
use proto::{Decode, MessageHead, Http1Transaction, ParseResult, RequestLine, RequestHead};
use proto::{BodyLength, Decode, MessageHead, Http1Transaction, ParseResult, RequestLine, RequestHead};
use proto::h1::{Encoder, Decoder, date};
const MAX_HEADERS: usize = 100;
@@ -122,8 +122,13 @@ where
}
fn encode(mut head: MessageHead<Self::Outgoing>, has_body: bool, method: &mut Option<Method>, dst: &mut Vec<u8>) -> ::Result<Encoder> {
trace!("Server::encode has_body={}, method={:?}", has_body, method);
fn encode(
mut head: MessageHead<Self::Outgoing>,
body: Option<BodyLength>,
method: &mut Option<Method>,
dst: &mut Vec<u8>,
) -> ::Result<Encoder> {
trace!("Server::encode body={:?}, method={:?}", body, method);
// hyper currently doesn't support returning 1xx status codes as a Response
// This is because Service only allows returning a single Response, and
@@ -132,7 +137,7 @@ where
let ret = if StatusCode::SWITCHING_PROTOCOLS == head.subject {
T::on_encode_upgrade(&mut head)
.map(|_| {
let mut enc = Server::set_length(&mut head, has_body, method.as_ref());
let mut enc = Server::set_length(&mut head, body, method.as_ref());
enc.set_last();
enc
})
@@ -143,7 +148,7 @@ where
headers::content_length_zero(&mut head.headers);
Err(::Error::Status)
} else {
Ok(Server::set_length(&mut head, has_body, method.as_ref()))
Ok(Server::set_length(&mut head, body, method.as_ref()))
};
@@ -160,6 +165,7 @@ where
extend(dst, head.subject.as_str().as_bytes());
extend(dst, b" ");
// a reason MUST be written, as many parsers will expect it.
extend(dst, head.subject.canonical_reason().unwrap_or("<none>").as_bytes());
extend(dst, b"\r\n");
}
@@ -207,7 +213,7 @@ where
}
impl Server<()> {
fn set_length(head: &mut MessageHead<StatusCode>, has_body: bool, method: Option<&Method>) -> Encoder {
fn set_length(head: &mut MessageHead<StatusCode>, body: Option<BodyLength>, method: Option<&Method>) -> Encoder {
// these are here thanks to borrowck
// `if method == Some(&Method::Get)` says the RHS doesn't live long enough
const HEAD: Option<&'static Method> = Some(&Method::HEAD);
@@ -230,8 +236,8 @@ impl Server<()> {
}
};
if has_body && can_have_body {
set_length(&mut head.headers, head.version == Version::HTTP_11)
if let (Some(body), true) = (body, can_have_body) {
set_length(&mut head.headers, body, head.version == Version::HTTP_11)
} else {
head.headers.remove(TRANSFER_ENCODING);
if can_have_body {
@@ -355,12 +361,17 @@ where
}
}
fn encode(mut head: MessageHead<Self::Outgoing>, has_body: bool, method: &mut Option<Method>, dst: &mut Vec<u8>) -> ::Result<Encoder> {
trace!("Client::encode has_body={}, method={:?}", has_body, method);
fn encode(
mut head: MessageHead<Self::Outgoing>,
body: Option<BodyLength>,
method: &mut Option<Method>,
dst: &mut Vec<u8>,
) -> ::Result<Encoder> {
trace!("Client::encode body={:?}, method={:?}", body, method);
*method = Some(head.subject.0.clone());
let body = Client::set_length(&mut head, has_body);
let body = Client::set_length(&mut head, body);
let init_cap = 30 + head.headers.len() * AVERAGE_HEADER_SIZE;
dst.reserve(init_cap);
@@ -399,33 +410,143 @@ where
}
impl Client<()> {
fn set_length(head: &mut RequestHead, has_body: bool) -> Encoder {
if has_body {
fn set_length(head: &mut RequestHead, body: Option<BodyLength>) -> Encoder {
if let Some(body) = body {
let can_chunked = head.version == Version::HTTP_11
&& (head.subject.0 != Method::HEAD)
&& (head.subject.0 != Method::GET)
&& (head.subject.0 != Method::CONNECT);
set_length(&mut head.headers, can_chunked)
set_length(&mut head.headers, body, can_chunked)
} else {
head.headers.remove(CONTENT_LENGTH);
head.headers.remove(TRANSFER_ENCODING);
Encoder::length(0)
}
}
}
fn set_length(headers: &mut HeaderMap, can_chunked: bool) -> Encoder {
let len = headers::content_length_parse(&headers);
fn set_length(headers: &mut HeaderMap, body: BodyLength, can_chunked: bool) -> Encoder {
// If the user already set specific headers, we should respect them, regardless
// of what the Entity knows about itself. They set them for a reason.
if let Some(len) = len {
Encoder::length(len)
} else if can_chunked {
//TODO: maybe not overwrite existing transfer-encoding
headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
Encoder::chunked()
// Because of the borrow checker, we can't check the for an existing
// Content-Length header while holding an `Entry` for the Transfer-Encoding
// header, so unfortunately, we must do the check here, first.
let existing_con_len = headers::content_length_parse(headers);
let mut should_remove_con_len = false;
if can_chunked {
// If the user set a transfer-encoding, respect that. Let's just
// make sure `chunked` is the final encoding.
let encoder = match headers.entry(TRANSFER_ENCODING)
.expect("TRANSFER_ENCODING is valid HeaderName") {
Entry::Occupied(te) => {
should_remove_con_len = true;
if headers::is_chunked(te.iter()) {
Some(Encoder::chunked())
} else {
warn!("user provided transfer-encoding does not end in 'chunked'");
// There's a Transfer-Encoding, but it doesn't end in 'chunked'!
// An example that could trigger this:
//
// Transfer-Encoding: gzip
//
// This can be bad, depending on if this is a request or a
// response.
//
// - A request is illegal if there is a `Transfer-Encoding`
// but it doesn't end in `chunked`.
// - A response that has `Transfer-Encoding` but doesn't
// end in `chunked` isn't illegal, it just forces this
// to be close-delimited.
//
// We can try to repair this, by adding `chunked` ourselves.
headers::add_chunked(te);
Some(Encoder::chunked())
}
},
Entry::Vacant(te) => {
if let Some(len) = existing_con_len {
Some(Encoder::length(len))
} else if let BodyLength::Unknown = body {
should_remove_con_len = true;
te.insert(HeaderValue::from_static("chunked"));
Some(Encoder::chunked())
} else {
None
}
},
};
// This is because we need a second mutable borrow to remove
// content-length header.
if let Some(encoder) = encoder {
if should_remove_con_len && existing_con_len.is_some() {
headers.remove(CONTENT_LENGTH);
}
return encoder;
}
// User didn't set transfer-encoding, AND we know body length,
// so we can just set the Content-Length automatically.
let len = if let BodyLength::Known(len) = body {
len
} else {
unreachable!("BodyLength::Unknown would set chunked");
};
set_content_length(headers, len)
} else {
headers.remove(TRANSFER_ENCODING);
Encoder::eof()
// Chunked isn't legal, so if it is set, we need to remove it.
// Also, if it *is* set, then we shouldn't replace with a length,
// since the user tried to imply there isn't a length.
let encoder = if headers.remove(TRANSFER_ENCODING).is_some() {
trace!("removing illegal transfer-encoding header");
should_remove_con_len = true;
Encoder::eof()
} else if let Some(len) = existing_con_len {
Encoder::length(len)
} else if let BodyLength::Known(len) = body {
set_content_length(headers, len)
} else {
Encoder::eof()
};
if should_remove_con_len && existing_con_len.is_some() {
headers.remove(CONTENT_LENGTH);
}
encoder
}
}
fn set_content_length(headers: &mut HeaderMap, len: u64) -> Encoder {
// At this point, there should not be a valid Content-Length
// header. However, since we'll be indexing in anyways, we can
// warn the user if there was an existing illegal header.
match headers.entry(CONTENT_LENGTH)
.expect("CONTENT_LENGTH is valid HeaderName") {
Entry::Occupied(mut cl) => {
// Uh oh, the user set `Content-Length` headers, but set bad ones.
// This would be an illegal message anyways, so let's try to repair
// with our known good length.
warn!("user provided content-length header was invalid");
// Internal sanity check, we should have already determined
// that the header was illegal before calling this function.
debug_assert!(headers::content_length_parse_all(cl.iter()).is_none());
cl.insert(headers::content_length_value(len));
Encoder::length(len)
},
Entry::Vacant(cl) => {
cl.insert(headers::content_length_value(len));
Encoder::length(len)
}
}
}
@@ -573,6 +694,7 @@ mod tests {
}
}
#[test]
fn test_parse_request() {
extern crate pretty_env_logger;

View File

@@ -8,7 +8,7 @@ pub use self::body::Body;
pub use self::chunk::Chunk;
pub use self::h1::{dispatch, Conn};
mod body;
pub mod body;
mod chunk;
mod h1;
//mod h2;
@@ -72,7 +72,12 @@ pub trait Http1Transaction {
type Outgoing: Default;
fn parse(bytes: &mut BytesMut) -> ParseResult<Self::Incoming>;
fn decoder(head: &MessageHead<Self::Incoming>, method: &mut Option<Method>) -> ::Result<Decode>;
fn encode(head: MessageHead<Self::Outgoing>, has_body: bool, method: &mut Option<Method>, dst: &mut Vec<u8>) -> ::Result<h1::Encoder>;
fn encode(
head: MessageHead<Self::Outgoing>,
body: Option<BodyLength>,
method: &mut Option<Method>,
dst: &mut Vec<u8>,
) -> ::Result<h1::Encoder>;
fn on_error(err: &::Error) -> Option<MessageHead<Self::Outgoing>>;
fn should_error_on_parse_eof() -> bool;
@@ -81,6 +86,15 @@ pub trait Http1Transaction {
pub type ParseResult<T> = ::Result<Option<(MessageHead<T>, usize)>>;
#[derive(Debug)]
pub enum BodyLength {
/// Content-Length
Known(u64),
/// Transfer-Encoding: chunked (if h1)
Unknown,
}
#[derive(Debug)]
pub enum Decode {
/// Decode normally.