diff --git a/src/body.rs b/src/body.rs index eee5cab..516e642 100644 --- a/src/body.rs +++ b/src/body.rs @@ -209,30 +209,41 @@ impl Sender { use bytes::{BufMut, BytesMut}; use futures::future; + let con_len = self.body.1; let cap = cmp::min(self.body.1.unwrap_or(8192), 8192); + let mut written = 0; let mut buf = BytesMut::with_capacity(cap as usize); let mut body = self.body.0; // Put in an option so that it can be consumed on error to call abort() let mut tx = Some(self.tx); future::poll_fn(move || loop { + if Some(written) == con_len { + // Written up to content-length, so stop. + return Ok(().into()); + } + try_ready!(tx .as_mut() .expect("tx only taken on error") .poll_ready() .map_err(::error::from)); + if buf.remaining_mut() == 0 { + buf.reserve(8192); + } + match body.read(unsafe { buf.bytes_mut() }) { - Ok(0) => return Ok(().into()), + Ok(0) => { + return Ok(().into()) + }, Ok(n) => { unsafe { buf.advance_mut(n); } + written += n as u64; let tx = tx.as_mut().expect("tx only taken on error"); if let Err(_) = tx.send_data(buf.take().freeze().into()) { return Err(::error::timedout(None)); } - if buf.remaining_mut() == 0 { - buf.reserve(8192); - } } Err(e) => { let ret = io::Error::new(e.kind(), e.to_string()); diff --git a/src/client.rs b/src/client.rs index 3067c83..b473952 100644 --- a/src/client.rs +++ b/src/client.rs @@ -438,8 +438,7 @@ impl ClientHandle { // work is Future<(), ()>, and our closure will never return Err - rt.spawn(work) - .run() + rt.block_on(work) .expect("runtime unexpected error"); })); diff --git a/tests/multipart.rs b/tests/multipart.rs index bf07613..d4f3377 100644 --- a/tests/multipart.rs +++ b/tests/multipart.rs @@ -5,7 +5,7 @@ extern crate reqwest; mod support; #[test] -fn test_multipart() { +fn text_part() { let _ = env_logger::try_init(); let form = reqwest::multipart::Form::new() diff --git a/tests/timeouts.rs b/tests/timeouts.rs index 614e83f..70c8ac2 100644 --- a/tests/timeouts.rs +++ b/tests/timeouts.rs @@ -1,3 +1,4 @@ +extern crate env_logger; extern crate reqwest; #[macro_use] @@ -8,34 +9,40 @@ use std::time::Duration; #[test] fn test_write_timeout() { + let _ = env_logger::try_init(); + let body = String::from_utf8(vec![b'x'; 20_000]).unwrap(); + let len = 8192; + let server = server! { - request: b"\ + request: format!("\ POST /write-timeout HTTP/1.1\r\n\ user-agent: $USERAGENT\r\n\ accept: */*\r\n\ - content-length: 5\r\n\ + content-length: {}\r\n\ accept-encoding: gzip\r\n\ host: $HOST\r\n\ \r\n\ - Hello\ - ", + {}\ + ", body.len(), body), response: b"\ HTTP/1.1 200 OK\r\n\ Content-Length: 5\r\n\ \r\n\ Hello\ ", - read_timeout: Duration::from_secs(1) + read_timeout: Duration::from_secs(2) + + //response_timeout: Duration::from_secs(1) }; + let cursor = ::std::io::Cursor::new(body.into_bytes()); let url = format!("http://{}/write-timeout", server.addr()); let err = reqwest::Client::builder() .timeout(Duration::from_millis(500)) .build() .unwrap() .post(&url) - .header(reqwest::header::CONTENT_LENGTH, reqwest::header::HeaderValue::from_static("5")) - .body(reqwest::Body::new(&b"Hello"[..])) + .body(reqwest::Body::sized(cursor, len as u64)) .send() .unwrap_err(); @@ -46,6 +53,7 @@ fn test_write_timeout() { #[test] fn test_response_timeout() { + let _ = env_logger::try_init(); let server = server! { request: b"\ GET /response-timeout HTTP/1.1\r\n\ @@ -77,6 +85,7 @@ fn test_response_timeout() { #[test] fn test_read_timeout() { + let _ = env_logger::try_init(); let server = server! { request: b"\ GET /read-timeout HTTP/1.1\r\n\