fix improper handling of request body backpressure

Closes #348
This commit is contained in:
Sean McArthur
2018-09-19 14:20:08 -07:00
parent 276c3d6594
commit 2698148743
8 changed files with 166 additions and 68 deletions

View File

@@ -3,6 +3,7 @@ use std::fmt;
use std::io::{self, Cursor, Read};
use bytes::Bytes;
use futures::Future;
use hyper::{self};
use {async_impl};
@@ -77,6 +78,37 @@ impl Body {
kind: Kind::Reader(Box::from(reader), Some(len)),
}
}
pub(crate) fn len(&self) -> Option<u64> {
match self.kind {
Kind::Reader(_, len) => len,
Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
}
}
pub(crate) fn into_reader(self) -> Reader {
match self.kind {
Kind::Reader(r, _) => Reader::Reader(r),
Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
}
}
pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) {
match self.kind {
Kind::Reader(read, len) => {
let (tx, rx) = hyper::Body::channel();
let tx = Sender {
body: (read, len),
tx: tx,
};
(Some(tx), async_impl::body::wrap(rx), len)
},
Kind::Bytes(chunk) => {
let len = chunk.len() as u64;
(None, async_impl::body::reusable(chunk), Some(len))
}
}
}
}
@@ -150,29 +182,11 @@ impl<'a> fmt::Debug for DebugLength<'a> {
}
}
// pub(crate)
pub fn len(body: &Body) -> Option<u64> {
match body.kind {
Kind::Reader(_, len) => len,
Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
}
}
pub enum Reader {
pub(crate) enum Reader {
Reader(Box<Read + Send>),
Bytes(Cursor<Bytes>),
}
#[inline]
pub fn reader(body: Body) -> Reader {
match body.kind {
Kind::Reader(r, _) => Reader::Reader(r),
Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
}
}
impl Read for Reader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self {
@@ -182,25 +196,37 @@ impl Read for Reader {
}
}
pub struct Sender {
pub(crate) struct Sender {
body: (Box<Read + Send>, Option<u64>),
tx: hyper::body::Sender,
}
impl Sender {
pub fn send(self) -> ::Result<()> {
// A `Future` that may do blocking read calls.
// As a `Future`, this integrates easily with `wait::timeout`.
pub(crate) fn send(self) -> impl Future<Item=(), Error=::Error> {
use std::cmp;
use bytes::{BufMut, BytesMut};
use futures::future;
let cap = cmp::min(self.body.1.unwrap_or(8192), 8192);
let mut buf = BytesMut::with_capacity(cap as usize);
let mut body = self.body.0;
let mut tx = self.tx;
loop {
// Put in an option so that it can be consumed on error to call abort()
let mut tx = Some(self.tx);
future::poll_fn(move || loop {
try_ready!(tx
.as_mut()
.expect("tx only taken on error")
.poll_ready()
.map_err(::error::from));
match body.read(unsafe { buf.bytes_mut() }) {
Ok(0) => return Ok(()),
Ok(0) => return Ok(().into()),
Ok(n) => {
unsafe { buf.advance_mut(n); }
let tx = tx.as_mut().expect("tx only taken on error");
if let Err(_) = tx.send_data(buf.take().freeze().into()) {
return Err(::error::timedout(None));
}
@@ -210,35 +236,20 @@ impl Sender {
}
Err(e) => {
let ret = io::Error::new(e.kind(), e.to_string());
tx.abort();
tx
.take()
.expect("tx only taken on error")
.abort();
return Err(::error::from(ret));
}
}
}
}
}
#[inline]
pub fn async(body: Body) -> (Option<Sender>, async_impl::Body, Option<u64>) {
match body.kind {
Kind::Reader(read, len) => {
let (tx, rx) = hyper::Body::channel();
let tx = Sender {
body: (read, len),
tx: tx,
};
(Some(tx), async_impl::body::wrap(rx), len)
},
Kind::Bytes(chunk) => {
let len = chunk.len() as u64;
(None, async_impl::body::reusable(chunk), Some(len))
}
})
}
}
// useful for tests, but not publicly exposed
#[cfg(test)]
pub fn read_to_string(mut body: Body) -> io::Result<String> {
pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> {
let mut s = String::new();
match body.kind {
Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s),

View File

@@ -4,6 +4,7 @@ use std::time::Duration;
use std::thread;
use futures::{Future, Stream};
use futures::future::{self, Either};
use futures::sync::{mpsc, oneshot};
use request::{Request, RequestBuilder};
@@ -466,20 +467,29 @@ impl ClientHandle {
.unbounded_send((req, tx))
.expect("core thread panicked");
if let Some(body) = body {
try_!(body.send(), &url);
}
let write = if let Some(body) = body {
Either::A(body.send())
//try_!(body.send(self.timeout.0), &url);
} else {
Either::B(future::ok(()))
};
let res = match wait::timeout(rx, self.timeout.0) {
let rx = rx.map_err(|_canceled| {
// The only possible reason there would be a Canceled error
// is if the thread running the event loop panicked. We could return
// an Err here, like a BrokenPipe, but the Client is not
// recoverable. Additionally, the panic in the other thread
// is not normal, and should likely be propagated.
panic!("event loop thread panicked");
});
let fut = write.join(rx).map(|((), res)| res);
let res = match wait::timeout(fut, self.timeout.0) {
Ok(res) => res,
Err(wait::Waited::TimedOut) => return Err(::error::timedout(Some(url))),
Err(wait::Waited::Err(_canceled)) => {
// The only possible reason there would be a Cancelled error
// is if the thread running the Core panicked. We could return
// an Err here, like a BrokenPipe, but the Client is not
// recoverable. Additionally, the panic in the other thread
// is not normal, and should likely be propagated.
panic!("core thread panicked");
Err(wait::Waited::Err(err)) => {
return Err(err.with_url(url));
}
};
res.map(|res| {

View File

@@ -82,6 +82,12 @@ impl Error {
self.url.as_ref()
}
pub(crate) fn with_url(mut self, url: Url) -> Error {
debug_assert_eq!(self.url, None, "with_url overriding existing url");
self.url = Some(url);
self
}
/// Returns a reference to the internal error, if available.
///
/// The `'static` bounds allows using `downcast_ref` to check the

View File

@@ -93,7 +93,7 @@ impl Form {
pub(crate) fn compute_length(&mut self) -> Option<u64> {
let mut length = 0u64;
for &(ref name, ref field) in self.fields.iter() {
match ::body::len(&field.value) {
match field.value.len() {
Some(value_length) => {
// We are constructing the header just to get its length. To not have to
// construct it again when the request is sent we cache these headers.
@@ -272,7 +272,7 @@ impl Reader {
});
let reader = boundary
.chain(header)
.chain(::body::reader(field.value))
.chain(field.value.into_reader())
.chain(Cursor::new("\r\n"));
// According to https://tools.ietf.org/html/rfc2046#section-5.1.1
// the very last field has a special boundary

View File

@@ -86,9 +86,9 @@ impl Request {
let mut req_async = self.inner;
let body = self.body.and_then(|body| {
let (tx, body, len) = body::async(body);
let (tx, body, len) = body.into_async();
if let Some(len) = len {
req_async.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_str(len.to_string().as_str()).expect(""));
req_async.headers_mut().insert(CONTENT_LENGTH, len.into());
}
*req_async.body_mut() = Some(body);
tx