diff --git a/Cargo.toml b/Cargo.toml index 67c1990..50db374 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,9 +53,11 @@ time = "0.1.42" [dev-dependencies] env_logger = "0.6" serde = { version = "1.0", features = ["derive"] } +tokio = { version = "0.1.7", default-features = false, features = ["rt-full", "tcp", "fs"] } tokio-tcp = "0.1" libflate = "0.1" doc-comment = "0.3" +bytes = "0.4" [features] default = ["default-tls"] diff --git a/examples/async_stream.rs b/examples/async_stream.rs new file mode 100644 index 0000000..3081483 --- /dev/null +++ b/examples/async_stream.rs @@ -0,0 +1,81 @@ +#![deny(warnings)] + +#[macro_use] +extern crate futures; +extern crate bytes; +extern crate reqwest; +extern crate tokio; +extern crate tokio_threadpool; + +use std::io::{self, Cursor}; +use std::mem; +use std::path::Path; + +use bytes::Bytes; +use futures::{Async, Future, Poll, Stream}; +use reqwest::async::{Client, Decoder}; +use tokio::fs::File; +use tokio::io::AsyncRead; + +const CHUNK_SIZE: usize = 1024; + +struct FileSource { + inner: File, +} + +impl FileSource { + fn new(file: File) -> FileSource { + FileSource { inner: file } + } +} + +impl Stream for FileSource { + type Item = Bytes; + type Error = io::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let mut buf = [0; CHUNK_SIZE]; + let size = try_ready!(self.inner.poll_read(&mut buf)); + if size > 0 { + Ok(Async::Ready(Some(buf[0..size].into()))) + } else { + Ok(Async::Ready(None)) + } + } +} + +fn post

(path: P) -> impl Future +where + P: AsRef, +{ + File::open(path.as_ref().to_owned()) + .map_err(|err| println!("request error: {}", err)) + .and_then(|file| { + let source: Box + Send> = + Box::new(FileSource::new(file)); + + Client::new() + .post("https://httpbin.org/post") + .body(source) + .send() + .and_then(|mut res| { + println!("{}", res.status()); + + let body = mem::replace(res.body_mut(), Decoder::empty()); + body.concat2() + }) + .map_err(|err| println!("request error: {}", err)) + .map(|body| { + let mut body = Cursor::new(body); + let _ = io::copy(&mut body, &mut io::stdout()).map_err(|err| { + println!("stdout error: {}", err); + }); + }) + }) +} + +fn main() { + let pool = tokio_threadpool::ThreadPool::new(); + let path = concat!(env!("CARGO_MANIFEST_DIR"), "/LICENSE-APACHE"); + tokio::run(pool.spawn_handle(post(path))); +} diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index 1bc256d..42119a1 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -137,6 +137,18 @@ impl From<&'static str> for Body { } } +impl From + Send>> for Body +where + hyper::Chunk: From, + I: 'static, + E: std::error::Error + Send + Sync + 'static, +{ + #[inline] + fn from(s: Box + Send>) -> Body { + Body::wrap(::hyper::Body::wrap_stream(s)) + } +} + /// A chunk of bytes for a `Body`. /// /// A `Chunk` can be treated like `&[u8]`. diff --git a/tests/async.rs b/tests/async.rs index 4a50662..b36d929 100644 --- a/tests/async.rs +++ b/tests/async.rs @@ -3,11 +3,12 @@ extern crate libflate; extern crate reqwest; extern crate hyper; extern crate tokio; +extern crate bytes; #[macro_use] mod support; -use std::io::Write; +use std::io::{self, Write}; use std::time::Duration; use futures::{Future, Stream}; @@ -16,6 +17,8 @@ use tokio::runtime::current_thread::Runtime; use reqwest::async::Client; use reqwest::async::multipart::{Form, Part}; +use bytes::Bytes; + #[test] fn gzip_response() { gzip_case(10_000, 4096); @@ -231,3 +234,51 @@ fn gzip_case(response_size: usize, chunk_size: usize) { rt.block_on(res_future).unwrap(); } + +#[test] +fn body_stream() { + let _ = env_logger::try_init(); + + let source: Box + Send> + = Box::new(futures::stream::iter_ok::<_, io::Error>( + vec![Bytes::from_static(b"123"), Bytes::from_static(b"4567")])); + + let expected_body = "3\r\n123\r\n4\r\n4567\r\n0\r\n\r\n"; + + let server = server! { + request: format!("\ + POST /post HTTP/1.1\r\n\ + user-agent: $USERAGENT\r\n\ + accept: */*\r\n\ + accept-encoding: gzip\r\n\ + host: $HOST\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + {}\ + ", expected_body), + response: b"\ + HTTP/1.1 200 OK\r\n\ + Server: post\r\n\ + Content-Length: 7\r\n\ + \r\n\ + " + }; + + let url = format!("http://{}/post", server.addr()); + + let mut rt = Runtime::new().expect("new rt"); + + let client = Client::new(); + + let res_future = client.post(&url) + .body(source) + .send() + .and_then(|res| { + assert_eq!(res.url().as_str(), &url); + assert_eq!(res.status(), reqwest::StatusCode::OK); + + Ok(()) + }); + + rt.block_on(res_future).unwrap(); +}