Added support for sending requests with streaming Body (#545)

This commit is contained in:
Dmitry Pankratov
2019-06-14 19:29:54 +02:00
committed by Sean McArthur
parent 9f256405e5
commit e16aed5e3c
4 changed files with 147 additions and 1 deletions

View File

@@ -53,9 +53,11 @@ time = "0.1.42"
[dev-dependencies]
env_logger = "0.6"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "0.1.7", default-features = false, features = ["rt-full", "tcp", "fs"] }
tokio-tcp = "0.1"
libflate = "0.1"
doc-comment = "0.3"
bytes = "0.4"
[features]
default = ["default-tls"]

81
examples/async_stream.rs Normal file
View File

@@ -0,0 +1,81 @@
#![deny(warnings)]
#[macro_use]
extern crate futures;
extern crate bytes;
extern crate reqwest;
extern crate tokio;
extern crate tokio_threadpool;
use std::io::{self, Cursor};
use std::mem;
use std::path::Path;
use bytes::Bytes;
use futures::{Async, Future, Poll, Stream};
use reqwest::async::{Client, Decoder};
use tokio::fs::File;
use tokio::io::AsyncRead;
const CHUNK_SIZE: usize = 1024;
struct FileSource {
inner: File,
}
impl FileSource {
fn new(file: File) -> FileSource {
FileSource { inner: file }
}
}
impl Stream for FileSource {
type Item = Bytes;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut buf = [0; CHUNK_SIZE];
let size = try_ready!(self.inner.poll_read(&mut buf));
if size > 0 {
Ok(Async::Ready(Some(buf[0..size].into())))
} else {
Ok(Async::Ready(None))
}
}
}
fn post<P>(path: P) -> impl Future<Item = (), Error = ()>
where
P: AsRef<Path>,
{
File::open(path.as_ref().to_owned())
.map_err(|err| println!("request error: {}", err))
.and_then(|file| {
let source: Box<Stream<Item = Bytes, Error = io::Error> + Send> =
Box::new(FileSource::new(file));
Client::new()
.post("https://httpbin.org/post")
.body(source)
.send()
.and_then(|mut res| {
println!("{}", res.status());
let body = mem::replace(res.body_mut(), Decoder::empty());
body.concat2()
})
.map_err(|err| println!("request error: {}", err))
.map(|body| {
let mut body = Cursor::new(body);
let _ = io::copy(&mut body, &mut io::stdout()).map_err(|err| {
println!("stdout error: {}", err);
});
})
})
}
fn main() {
let pool = tokio_threadpool::ThreadPool::new();
let path = concat!(env!("CARGO_MANIFEST_DIR"), "/LICENSE-APACHE");
tokio::run(pool.spawn_handle(post(path)));
}

View File

@@ -137,6 +137,18 @@ impl From<&'static str> for Body {
}
}
impl<I, E> From<Box<dyn Stream<Item = I, Error = E> + Send>> for Body
where
hyper::Chunk: From<I>,
I: 'static,
E: std::error::Error + Send + Sync + 'static,
{
#[inline]
fn from(s: Box<dyn Stream<Item = I, Error = E> + Send>) -> Body {
Body::wrap(::hyper::Body::wrap_stream(s))
}
}
/// A chunk of bytes for a `Body`.
///
/// A `Chunk` can be treated like `&[u8]`.

View File

@@ -3,11 +3,12 @@ extern crate libflate;
extern crate reqwest;
extern crate hyper;
extern crate tokio;
extern crate bytes;
#[macro_use]
mod support;
use std::io::Write;
use std::io::{self, Write};
use std::time::Duration;
use futures::{Future, Stream};
@@ -16,6 +17,8 @@ use tokio::runtime::current_thread::Runtime;
use reqwest::async::Client;
use reqwest::async::multipart::{Form, Part};
use bytes::Bytes;
#[test]
fn gzip_response() {
gzip_case(10_000, 4096);
@@ -231,3 +234,51 @@ fn gzip_case(response_size: usize, chunk_size: usize) {
rt.block_on(res_future).unwrap();
}
#[test]
fn body_stream() {
let _ = env_logger::try_init();
let source: Box<Stream<Item = Bytes, Error = io::Error> + Send>
= Box::new(futures::stream::iter_ok::<_, io::Error>(
vec![Bytes::from_static(b"123"), Bytes::from_static(b"4567")]));
let expected_body = "3\r\n123\r\n4\r\n4567\r\n0\r\n\r\n";
let server = server! {
request: format!("\
POST /post HTTP/1.1\r\n\
user-agent: $USERAGENT\r\n\
accept: */*\r\n\
accept-encoding: gzip\r\n\
host: $HOST\r\n\
transfer-encoding: chunked\r\n\
\r\n\
{}\
", expected_body),
response: b"\
HTTP/1.1 200 OK\r\n\
Server: post\r\n\
Content-Length: 7\r\n\
\r\n\
"
};
let url = format!("http://{}/post", server.addr());
let mut rt = Runtime::new().expect("new rt");
let client = Client::new();
let res_future = client.post(&url)
.body(source)
.send()
.and_then(|res| {
assert_eq!(res.url().as_str(), &url);
assert_eq!(res.status(), reqwest::StatusCode::OK);
Ok(())
});
rt.block_on(res_future).unwrap();
}