fix streaming body from wrongly returning 'closed connection' error
This commit is contained in:
19
src/body.rs
19
src/body.rs
@@ -209,30 +209,41 @@ impl Sender {
|
|||||||
use bytes::{BufMut, BytesMut};
|
use bytes::{BufMut, BytesMut};
|
||||||
use futures::future;
|
use futures::future;
|
||||||
|
|
||||||
|
let con_len = self.body.1;
|
||||||
let cap = cmp::min(self.body.1.unwrap_or(8192), 8192);
|
let cap = cmp::min(self.body.1.unwrap_or(8192), 8192);
|
||||||
|
let mut written = 0;
|
||||||
let mut buf = BytesMut::with_capacity(cap as usize);
|
let mut buf = BytesMut::with_capacity(cap as usize);
|
||||||
let mut body = self.body.0;
|
let mut body = self.body.0;
|
||||||
// Put in an option so that it can be consumed on error to call abort()
|
// Put in an option so that it can be consumed on error to call abort()
|
||||||
let mut tx = Some(self.tx);
|
let mut tx = Some(self.tx);
|
||||||
|
|
||||||
future::poll_fn(move || loop {
|
future::poll_fn(move || loop {
|
||||||
|
if Some(written) == con_len {
|
||||||
|
// Written up to content-length, so stop.
|
||||||
|
return Ok(().into());
|
||||||
|
}
|
||||||
|
|
||||||
try_ready!(tx
|
try_ready!(tx
|
||||||
.as_mut()
|
.as_mut()
|
||||||
.expect("tx only taken on error")
|
.expect("tx only taken on error")
|
||||||
.poll_ready()
|
.poll_ready()
|
||||||
.map_err(::error::from));
|
.map_err(::error::from));
|
||||||
|
|
||||||
|
if buf.remaining_mut() == 0 {
|
||||||
|
buf.reserve(8192);
|
||||||
|
}
|
||||||
|
|
||||||
match body.read(unsafe { buf.bytes_mut() }) {
|
match body.read(unsafe { buf.bytes_mut() }) {
|
||||||
Ok(0) => return Ok(().into()),
|
Ok(0) => {
|
||||||
|
return Ok(().into())
|
||||||
|
},
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
unsafe { buf.advance_mut(n); }
|
unsafe { buf.advance_mut(n); }
|
||||||
|
written += n as u64;
|
||||||
let tx = tx.as_mut().expect("tx only taken on error");
|
let tx = tx.as_mut().expect("tx only taken on error");
|
||||||
if let Err(_) = tx.send_data(buf.take().freeze().into()) {
|
if let Err(_) = tx.send_data(buf.take().freeze().into()) {
|
||||||
return Err(::error::timedout(None));
|
return Err(::error::timedout(None));
|
||||||
}
|
}
|
||||||
if buf.remaining_mut() == 0 {
|
|
||||||
buf.reserve(8192);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let ret = io::Error::new(e.kind(), e.to_string());
|
let ret = io::Error::new(e.kind(), e.to_string());
|
||||||
|
|||||||
@@ -438,8 +438,7 @@ impl ClientHandle {
|
|||||||
|
|
||||||
|
|
||||||
// work is Future<(), ()>, and our closure will never return Err
|
// work is Future<(), ()>, and our closure will never return Err
|
||||||
rt.spawn(work)
|
rt.block_on(work)
|
||||||
.run()
|
|
||||||
.expect("runtime unexpected error");
|
.expect("runtime unexpected error");
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ extern crate reqwest;
|
|||||||
mod support;
|
mod support;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_multipart() {
|
fn text_part() {
|
||||||
let _ = env_logger::try_init();
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
let form = reqwest::multipart::Form::new()
|
let form = reqwest::multipart::Form::new()
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
extern crate env_logger;
|
||||||
extern crate reqwest;
|
extern crate reqwest;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
@@ -8,34 +9,40 @@ use std::time::Duration;
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_write_timeout() {
|
fn test_write_timeout() {
|
||||||
|
let _ = env_logger::try_init();
|
||||||
|
let body = String::from_utf8(vec![b'x'; 20_000]).unwrap();
|
||||||
|
let len = 8192;
|
||||||
|
|
||||||
let server = server! {
|
let server = server! {
|
||||||
request: b"\
|
request: format!("\
|
||||||
POST /write-timeout HTTP/1.1\r\n\
|
POST /write-timeout HTTP/1.1\r\n\
|
||||||
user-agent: $USERAGENT\r\n\
|
user-agent: $USERAGENT\r\n\
|
||||||
accept: */*\r\n\
|
accept: */*\r\n\
|
||||||
content-length: 5\r\n\
|
content-length: {}\r\n\
|
||||||
accept-encoding: gzip\r\n\
|
accept-encoding: gzip\r\n\
|
||||||
host: $HOST\r\n\
|
host: $HOST\r\n\
|
||||||
\r\n\
|
\r\n\
|
||||||
Hello\
|
{}\
|
||||||
",
|
", body.len(), body),
|
||||||
response: b"\
|
response: b"\
|
||||||
HTTP/1.1 200 OK\r\n\
|
HTTP/1.1 200 OK\r\n\
|
||||||
Content-Length: 5\r\n\
|
Content-Length: 5\r\n\
|
||||||
\r\n\
|
\r\n\
|
||||||
Hello\
|
Hello\
|
||||||
",
|
",
|
||||||
read_timeout: Duration::from_secs(1)
|
read_timeout: Duration::from_secs(2)
|
||||||
|
|
||||||
|
//response_timeout: Duration::from_secs(1)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let cursor = ::std::io::Cursor::new(body.into_bytes());
|
||||||
let url = format!("http://{}/write-timeout", server.addr());
|
let url = format!("http://{}/write-timeout", server.addr());
|
||||||
let err = reqwest::Client::builder()
|
let err = reqwest::Client::builder()
|
||||||
.timeout(Duration::from_millis(500))
|
.timeout(Duration::from_millis(500))
|
||||||
.build()
|
.build()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.post(&url)
|
.post(&url)
|
||||||
.header(reqwest::header::CONTENT_LENGTH, reqwest::header::HeaderValue::from_static("5"))
|
.body(reqwest::Body::sized(cursor, len as u64))
|
||||||
.body(reqwest::Body::new(&b"Hello"[..]))
|
|
||||||
.send()
|
.send()
|
||||||
.unwrap_err();
|
.unwrap_err();
|
||||||
|
|
||||||
@@ -46,6 +53,7 @@ fn test_write_timeout() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_response_timeout() {
|
fn test_response_timeout() {
|
||||||
|
let _ = env_logger::try_init();
|
||||||
let server = server! {
|
let server = server! {
|
||||||
request: b"\
|
request: b"\
|
||||||
GET /response-timeout HTTP/1.1\r\n\
|
GET /response-timeout HTTP/1.1\r\n\
|
||||||
@@ -77,6 +85,7 @@ fn test_response_timeout() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_read_timeout() {
|
fn test_read_timeout() {
|
||||||
|
let _ = env_logger::try_init();
|
||||||
let server = server! {
|
let server = server! {
|
||||||
request: b"\
|
request: b"\
|
||||||
GET /read-timeout HTTP/1.1\r\n\
|
GET /read-timeout HTTP/1.1\r\n\
|
||||||
|
|||||||
Reference in New Issue
Block a user