add async multipart request handling

This commit is contained in:
Kevin Wilson
2019-01-04 19:06:33 -06:00
committed by Sean McArthur
parent 11d7812e88
commit 4c21127f15
7 changed files with 493 additions and 10 deletions

View File

@@ -182,6 +182,12 @@ impl IntoIterator for Chunk {
}
}
impl From<Chunk> for hyper::Chunk {
fn from(val: Chunk) -> hyper::Chunk {
val.inner
}
}
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Body")

View File

@@ -7,5 +7,6 @@ pub use self::response::{Response, ResponseBuilderExt};
pub mod body;
pub mod client;
pub mod decoder;
pub mod multipart;
pub(crate) mod request;
mod response;

332
src/async_impl/multipart.rs Normal file
View File

@@ -0,0 +1,332 @@
//! multipart/form-data
use std::borrow::Cow;
use std::fmt;
use mime_guess::Mime;
use uuid::Uuid;
use http::HeaderMap;
use futures::Stream;
use super::Body;
use multipart::{PercentEncoding, PartProp};
/// An async multipart/form-data request.
pub struct Form {
boundary: String,
fields: Vec<(Cow<'static, str>, Part)>,
percent_encoding: PercentEncoding,
}
impl Form {
/// Creates a new async Form without any content.
pub fn new() -> Form {
Form {
boundary: format!("{}", Uuid::new_v4().to_simple()),
fields: Vec::new(),
percent_encoding: PercentEncoding::PathSegment,
}
}
/// Get the boundary that this form will use.
#[inline]
pub fn boundary(&self) -> &str {
&self.boundary
}
/// Add a data field with supplied name and value.
///
/// # Examples
///
/// ```
/// let form = reqwest::async::multipart::Form::new()
/// .text("username", "seanmonstar")
/// .text("password", "secret");
/// ```
pub fn text<T, U>(self, name: T, value: U) -> Form
where T: Into<Cow<'static, str>>,
U: Into<Cow<'static, str>>,
{
self.part(name, Part::text(value))
}
/// Adds a customized Part.
pub fn part<T>(mut self, name: T, part: Part) -> Form
where T: Into<Cow<'static, str>>,
{
self.fields.push((name.into(), part));
self
}
/// Configure this `Form` to percent-encode using the `path-segment` rules.
pub fn percent_encode_path_segment(mut self) -> Form {
self.percent_encoding = PercentEncoding::PathSegment;
self
}
/// Configure this `Form` to percent-encode using the `attr-char` rules.
pub fn percent_encode_attr_chars(mut self) -> Form {
self.percent_encoding = PercentEncoding::AttrChar;
self
}
/// Consume this instance and transform into an instance of hyper::Body for use in a request.
pub(crate) fn stream(mut self) -> hyper::Body {
if self.fields.len() == 0 {
return hyper::Body::empty();
}
// create initial part to init reduce chain
let (name, part) = self.fields.remove(0);
let start = self.part_stream(name, part);
let fields = self.take_fields();
// for each field, chain an additional stream
let stream = fields.into_iter().fold(start, |memo, (name, part)| {
let part_stream = self.part_stream(name, part);
hyper::Body::wrap_stream(memo.chain(part_stream))
});
// append special ending boundary
let last = hyper::Body::from(format!("--{}--\r\n", self.boundary));
hyper::Body::wrap_stream(stream.chain(last))
}
/// Generate a hyper::Body stream for a single Part instance of a Form request.
pub(crate) fn part_stream<T: Into<Cow<'static, str>>>(&mut self, name: T, part: Part) -> hyper::Body {
// start with boundary
let boundary = hyper::Body::from(format!("--{}\r\n", self.boundary));
// append headers
let header = hyper::Body::from({
let mut h = self.percent_encoding.encode_headers(&name.into(), &part);
h.extend_from_slice(b"\r\n\r\n");
h
});
// then append form data followed by terminating CRLF
hyper::Body::wrap_stream(boundary.chain(header).chain(hyper::Body::wrap_stream(part.value)).chain(hyper::Body::from("\r\n".to_owned())))
}
/// Take the fields vector of this instance, replacing with an empty vector.
fn take_fields(&mut self) -> Vec<(Cow<'static, str>, Part)> {
std::mem::replace(&mut self.fields, Vec::new())
}
}
impl fmt::Debug for Form {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Form")
.field("boundary", &self.boundary)
.field("parts", &self.fields)
.finish()
}
}
/// A field in a multipart form.
pub struct Part {
value: Body,
mime: Option<Mime>,
file_name: Option<Cow<'static, str>>,
headers: HeaderMap,
}
impl Part {
/// Makes a text parameter.
pub fn text<T>(value: T) -> Part
where T: Into<Cow<'static, str>>,
{
let body = match value.into() {
Cow::Borrowed(slice) => Body::from(slice),
Cow::Owned(string) => Body::from(string),
};
Part::new(body)
}
/// Makes a new parameter from arbitrary bytes
pub fn bytes<T>(value: T) -> Part
where T: Into<Cow<'static, [u8]>>
{
let body = match value.into() {
Cow::Borrowed(slice) => Body::from(slice),
Cow::Owned(vec) => Body::from(vec),
};
Part::new(body)
}
/// Makes a new parameter from an arbitrary stream.
pub fn stream<T: Stream + Send + 'static>(value: T) -> Part
where hyper::Chunk: std::convert::From<<T as futures::Stream>::Item>,
<T as futures::Stream>::Error: std::error::Error + Send + Sync {
Part::new(Body::wrap(hyper::Body::wrap_stream(value)))
}
fn new(value: Body) -> Part {
Part {
value: value,
mime: None,
file_name: None,
headers: HeaderMap::default()
}
}
/// Tries to set the mime of this part.
pub fn mime_str(mut self, mime: &str) -> ::Result<Part> {
self.mime = Some(try_!(mime.parse()));
Ok(self)
}
// Re-enable when mime 0.4 is available, with split MediaType/MediaRange.
#[cfg(test)]
fn mime(mut self, mime: Mime) -> Part {
self.mime = Some(mime);
self
}
/// Sets the filename, builder style.
pub fn file_name<T: Into<Cow<'static, str>>>(mut self, filename: T) -> Part {
self.file_name = Some(filename.into());
self
}
}
impl PartProp for Part {
fn mime(&self) -> &Option<Mime> {
&self.mime
}
fn mime_mut(&mut self) -> &mut Option<Mime> {
&mut self.mime
}
fn file_name(&self) -> &Option<Cow<'static, str>> {
&self.file_name
}
fn file_name_mut(&mut self) -> &mut Option<Cow<'static, str>> {
&mut self.file_name
}
fn headers(&self) -> &HeaderMap {
&self.headers
}
fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.headers
}
}
impl fmt::Debug for Part {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Part")
.field("value", &self.value)
.field("mime", &self.mime)
.field("file_name", &self.file_name)
.field("headers", &self.headers)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio;
#[test]
fn form_empty() {
let form = Form::new();
let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt");
let body_ft = form.stream();
let out = rt.block_on(body_ft.map(|c| c.into_bytes()).concat2());
assert_eq!(out.unwrap(), Vec::new());
}
#[test]
fn stream_to_end() {
let mut form = Form::new()
.part("reader1", Part::stream(futures::stream::once::<_, hyper::Error>(Ok(hyper::Chunk::from("part1".to_owned())))))
.part("key1", Part::text("value1"))
.part(
"key2",
Part::text("value2").mime(::mime::IMAGE_BMP),
)
.part("reader2", Part::stream(futures::stream::once::<_, hyper::Error>(Ok(hyper::Chunk::from("part2".to_owned())))))
.part(
"key3",
Part::text("value3").file_name("filename"),
);
form.boundary = "boundary".to_string();
let expected = "--boundary\r\n\
Content-Disposition: form-data; name=\"reader1\"\r\n\r\n\
part1\r\n\
--boundary\r\n\
Content-Disposition: form-data; name=\"key1\"\r\n\r\n\
value1\r\n\
--boundary\r\n\
Content-Disposition: form-data; name=\"key2\"\r\n\
Content-Type: image/bmp\r\n\r\n\
value2\r\n\
--boundary\r\n\
Content-Disposition: form-data; name=\"reader2\"\r\n\r\n\
part2\r\n\
--boundary\r\n\
Content-Disposition: form-data; name=\"key3\"; filename=\"filename\"\r\n\r\n\
value3\r\n--boundary--\r\n";
let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt");
let body_ft = form.stream();
let out = rt.block_on(body_ft.map(|c| c.into_bytes()).concat2()).unwrap();
// These prints are for debug purposes in case the test fails
println!(
"START REAL\n{}\nEND REAL",
::std::str::from_utf8(&out).unwrap()
);
println!("START EXPECTED\n{}\nEND EXPECTED", expected);
assert_eq!(::std::str::from_utf8(&out).unwrap(), expected);
}
#[test]
fn stream_to_end_with_header() {
let mut part = Part::text("value2").mime(::mime::IMAGE_BMP);
part.headers_mut().insert("Hdr3", "/a/b/c".parse().unwrap());
let mut form = Form::new().part("key2", part);
form.boundary = "boundary".to_string();
let expected = "--boundary\r\n\
Content-Disposition: form-data; name=\"key2\"\r\n\
Content-Type: image/bmp\r\n\
hdr3: /a/b/c\r\n\
\r\n\
value2\r\n\
--boundary--\r\n";
let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt");
let body_ft = form.stream();
let out = rt.block_on(body_ft.map(|c| c.into_bytes()).concat2()).unwrap();
// These prints are for debug purposes in case the test fails
println!(
"START REAL\n{}\nEND REAL",
::std::str::from_utf8(&out).unwrap()
);
println!("START EXPECTED\n{}\nEND EXPECTED", expected);
assert_eq!(::std::str::from_utf8(&out).unwrap(), expected);
}
#[test]
fn header_percent_encoding() {
let name = "start%'\"\r\nßend";
let field = Part::text("");
assert_eq!(
PercentEncoding::PathSegment.encode_headers(name, &field),
&b"Content-Disposition: form-data; name*=utf-8''start%25'%22%0D%0A%C3%9Fend"[..]
);
assert_eq!(
PercentEncoding::AttrChar.encode_headers(name, &field),
&b"Content-Disposition: form-data; name*=utf-8''start%25%27%22%0D%0A%C3%9Fend"[..]
);
}
}

View File

@@ -6,6 +6,7 @@ use serde_json;
use serde_urlencoded;
use super::body::{Body};
use super::multipart as multipart_;
use super::client::{Client, Pending};
use header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
use http::HttpTryFrom;
@@ -179,6 +180,47 @@ impl RequestBuilder {
self
}
/// Sends a multipart/form-data body.
///
/// ```
/// # extern crate futures;
/// # extern crate reqwest;
///
/// # use reqwest::Error;
/// # use futures::future::Future;
///
/// # fn run() -> Result<(), Error> {
/// let client = reqwest::async::Client::new();
/// let form = reqwest::async::multipart::Form::new()
/// .text("key3", "value3")
/// .text("key4", "value4");
///
/// let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt");
///
/// let response = client.post("your url")
/// .multipart(form)
/// .send()
/// .and_then(|_| {
/// Ok(())
/// });
///
/// rt.block_on(response)
/// # }
/// ```
pub fn multipart(self, multipart: multipart_::Form) -> RequestBuilder {
let mut builder = self.header(
CONTENT_TYPE,
format!(
"multipart/form-data; boundary={}",
multipart.boundary()
).as_str()
);
if let Ok(ref mut req) = builder.request {
*req.body_mut() = Some(Body::wrap(multipart.stream()))
}
builder
}
/// Modify the query string of the URL.
///
/// Modifies the URL of this request, adding the parameters provided.

View File

@@ -232,6 +232,7 @@ pub mod async {
RequestBuilder,
Response,
ResponseBuilderExt,
multipart
};
}

View File

@@ -20,7 +20,7 @@ pub struct Form {
percent_encoding: PercentEncoding,
}
enum PercentEncoding {
pub(crate) enum PercentEncoding {
PathSegment,
AttrChar,
}
@@ -145,6 +145,15 @@ impl fmt::Debug for Form {
}
}
pub(crate) trait PartProp {
fn mime(&self) -> &Option<Mime>;
fn mime_mut(&mut self) -> &mut Option<Mime>;
fn file_name(&self) -> &Option<Cow<'static, str>>;
fn file_name_mut(&mut self) -> &mut Option<Cow<'static, str>>;
fn headers(&self) -> &HeaderMap;
fn headers_mut(&mut self) -> &mut HeaderMap;
}
/// A field in a multipart form.
pub struct Part {
@@ -240,13 +249,31 @@ impl Part {
self
}
/// Returns a reference to the map with additional header fields
pub fn headers(&self) -> &HeaderMap {
}
impl PartProp for Part {
fn mime(&self) -> &Option<Mime> {
&self.mime
}
fn mime_mut(&mut self) -> &mut Option<Mime> {
&mut self.mime
}
fn file_name(&self) -> &Option<Cow<'static, str>> {
&self.file_name
}
fn file_name_mut(&mut self) -> &mut Option<Cow<'static, str>> {
&mut self.file_name
}
fn headers(&self) -> &HeaderMap {
&self.headers
}
/// Returns a reference to the map with additional header fields
pub fn headers_mut(&mut self) -> &mut HeaderMap {
fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.headers
}
}
@@ -342,7 +369,7 @@ impl Read for Reader {
}
#[derive(Debug, Clone)]
struct AttrCharEncodeSet;
pub(crate) struct AttrCharEncodeSet;
impl EncodeSet for AttrCharEncodeSet {
fn contains(&self, ch: u8) -> bool {
@@ -369,20 +396,20 @@ impl EncodeSet for AttrCharEncodeSet {
}
impl PercentEncoding {
fn encode_headers(&self, name: &str, field: &Part) -> Vec<u8> {
pub fn encode_headers<T: PartProp>(&self, name: &str, field: &T) -> Vec<u8> {
let s = format!(
"Content-Disposition: form-data; {}{}{}",
self.format_parameter("name", name),
match field.file_name {
match *field.file_name() {
Some(ref file_name) => format!("; {}", self.format_parameter("filename", file_name)),
None => String::new(),
},
match field.mime {
match *field.mime() {
Some(ref mime) => format!("\r\nContent-Type: {}", mime),
None => "".to_string(),
},
);
field.headers.iter().fold(s.into_bytes(), |mut header, (k,v)| {
field.headers().iter().fold(s.into_bytes(), |mut header, (k,v)| {
header.extend_from_slice(b"\r\n");
header.extend_from_slice(k.as_str().as_bytes());
header.extend_from_slice(b": ");

View File

@@ -1,12 +1,14 @@
extern crate futures;
extern crate libflate;
extern crate reqwest;
extern crate hyper;
extern crate tokio;
#[macro_use]
mod support;
use reqwest::async::Client;
use reqwest::async::multipart::{Form, Part};
use futures::{Future, Stream};
use std::io::Write;
use std::time::Duration;
@@ -21,6 +23,78 @@ fn async_test_gzip_single_byte_chunks() {
test_gzip(10, 1);
}
#[test]
fn async_test_multipart() {
let _ = env_logger::try_init();
let stream = futures::stream::once::<_, hyper::Error>(Ok(hyper::Chunk::from("part1 part2".to_owned())));
let part = Part::stream(stream);
let form = Form::new()
.text("foo", "bar")
.part("part_stream", part);
let expected_body = format!("\
24\r\n\
--{0}\r\n\r\n\
2E\r\n\
Content-Disposition: form-data; name=\"foo\"\r\n\r\n\r\n\
3\r\n\
bar\r\n\
2\r\n\
\r\n\r\n\
24\r\n\
--{0}\r\n\r\n\
36\r\n\
Content-Disposition: form-data; name=\"part_stream\"\r\n\r\n\r\n\
B\r\n\
part1 part2\r\n\
2\r\n\
\r\n\r\n\
26\r\n\
--{0}--\r\n\r\n\
0\r\n\r\n\
", form.boundary());
let server = server! {
request: format!("\
POST /multipart/1 HTTP/1.1\r\n\
user-agent: $USERAGENT\r\n\
accept: */*\r\n\
content-type: multipart/form-data; boundary={}\r\n\
accept-encoding: gzip\r\n\
host: $HOST\r\n\
transfer-encoding: chunked\r\n\
\r\n\
{}\
", form.boundary(), expected_body),
response: b"\
HTTP/1.1 200 OK\r\n\
Server: multipart\r\n\
Content-Length: 0\r\n\
\r\n\
"
};
let url = format!("http://{}/multipart/1", server.addr());
let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt");
let client = Client::new();
let res_future = client.post(&url)
.multipart(form)
.send()
.and_then(|res| {
assert_eq!(res.url().as_str(), &url);
assert_eq!(res.status(), reqwest::StatusCode::OK);
Ok(())
});
rt.block_on(res_future).unwrap();
}
fn test_gzip(response_size: usize, chunk_size: usize) {
let content: String = (0..response_size).into_iter().map(|i| format!("test {}", i)).collect();
let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap();