From 4c21127f15ff814f8397a336fd647edc7cc89bc8 Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Fri, 4 Jan 2019 19:06:33 -0600 Subject: [PATCH] add async multipart request handling --- src/async_impl/body.rs | 6 + src/async_impl/mod.rs | 1 + src/async_impl/multipart.rs | 332 ++++++++++++++++++++++++++++++++++++ src/async_impl/request.rs | 42 +++++ src/lib.rs | 1 + src/multipart.rs | 47 +++-- tests/async.rs | 74 ++++++++ 7 files changed, 493 insertions(+), 10 deletions(-) create mode 100644 src/async_impl/multipart.rs diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index da1798e..8e7f1ae 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -182,6 +182,12 @@ impl IntoIterator for Chunk { } } +impl From for hyper::Chunk { + fn from(val: Chunk) -> hyper::Chunk { + val.inner + } +} + impl fmt::Debug for Body { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Body") diff --git a/src/async_impl/mod.rs b/src/async_impl/mod.rs index 525d6b5..f79eb61 100644 --- a/src/async_impl/mod.rs +++ b/src/async_impl/mod.rs @@ -7,5 +7,6 @@ pub use self::response::{Response, ResponseBuilderExt}; pub mod body; pub mod client; pub mod decoder; +pub mod multipart; pub(crate) mod request; mod response; diff --git a/src/async_impl/multipart.rs b/src/async_impl/multipart.rs new file mode 100644 index 0000000..7b905cd --- /dev/null +++ b/src/async_impl/multipart.rs @@ -0,0 +1,332 @@ +//! multipart/form-data +use std::borrow::Cow; +use std::fmt; + +use mime_guess::Mime; +use uuid::Uuid; +use http::HeaderMap; + +use futures::Stream; + +use super::Body; +use multipart::{PercentEncoding, PartProp}; + +/// An async multipart/form-data request. +pub struct Form { + boundary: String, + fields: Vec<(Cow<'static, str>, Part)>, + percent_encoding: PercentEncoding, +} + +impl Form { + /// Creates a new async Form without any content. + pub fn new() -> Form { + Form { + boundary: format!("{}", Uuid::new_v4().to_simple()), + fields: Vec::new(), + percent_encoding: PercentEncoding::PathSegment, + } + } + + /// Get the boundary that this form will use. + #[inline] + pub fn boundary(&self) -> &str { + &self.boundary + } + + /// Add a data field with supplied name and value. + /// + /// # Examples + /// + /// ``` + /// let form = reqwest::async::multipart::Form::new() + /// .text("username", "seanmonstar") + /// .text("password", "secret"); + /// ``` + pub fn text(self, name: T, value: U) -> Form + where T: Into>, + U: Into>, + { + self.part(name, Part::text(value)) + } + + /// Adds a customized Part. + pub fn part(mut self, name: T, part: Part) -> Form + where T: Into>, + { + self.fields.push((name.into(), part)); + self + } + + /// Configure this `Form` to percent-encode using the `path-segment` rules. + pub fn percent_encode_path_segment(mut self) -> Form { + self.percent_encoding = PercentEncoding::PathSegment; + self + } + + /// Configure this `Form` to percent-encode using the `attr-char` rules. + pub fn percent_encode_attr_chars(mut self) -> Form { + self.percent_encoding = PercentEncoding::AttrChar; + self + } + + /// Consume this instance and transform into an instance of hyper::Body for use in a request. + pub(crate) fn stream(mut self) -> hyper::Body { + if self.fields.len() == 0 { + return hyper::Body::empty(); + } + + // create initial part to init reduce chain + let (name, part) = self.fields.remove(0); + let start = self.part_stream(name, part); + + let fields = self.take_fields(); + // for each field, chain an additional stream + let stream = fields.into_iter().fold(start, |memo, (name, part)| { + let part_stream = self.part_stream(name, part); + hyper::Body::wrap_stream(memo.chain(part_stream)) + }); + // append special ending boundary + let last = hyper::Body::from(format!("--{}--\r\n", self.boundary)); + hyper::Body::wrap_stream(stream.chain(last)) + } + + /// Generate a hyper::Body stream for a single Part instance of a Form request. + pub(crate) fn part_stream>>(&mut self, name: T, part: Part) -> hyper::Body { + // start with boundary + let boundary = hyper::Body::from(format!("--{}\r\n", self.boundary)); + // append headers + let header = hyper::Body::from({ + let mut h = self.percent_encoding.encode_headers(&name.into(), &part); + h.extend_from_slice(b"\r\n\r\n"); + h + }); + // then append form data followed by terminating CRLF + hyper::Body::wrap_stream(boundary.chain(header).chain(hyper::Body::wrap_stream(part.value)).chain(hyper::Body::from("\r\n".to_owned()))) + } + + /// Take the fields vector of this instance, replacing with an empty vector. + fn take_fields(&mut self) -> Vec<(Cow<'static, str>, Part)> { + std::mem::replace(&mut self.fields, Vec::new()) + } +} + +impl fmt::Debug for Form { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Form") + .field("boundary", &self.boundary) + .field("parts", &self.fields) + .finish() + } +} + + +/// A field in a multipart form. +pub struct Part { + value: Body, + mime: Option, + file_name: Option>, + headers: HeaderMap, +} + +impl Part { + /// Makes a text parameter. + pub fn text(value: T) -> Part + where T: Into>, + { + let body = match value.into() { + Cow::Borrowed(slice) => Body::from(slice), + Cow::Owned(string) => Body::from(string), + }; + Part::new(body) + } + + /// Makes a new parameter from arbitrary bytes + pub fn bytes(value: T) -> Part + where T: Into> + { + let body = match value.into() { + Cow::Borrowed(slice) => Body::from(slice), + Cow::Owned(vec) => Body::from(vec), + }; + Part::new(body) + } + + /// Makes a new parameter from an arbitrary stream. + pub fn stream(value: T) -> Part + where hyper::Chunk: std::convert::From<::Item>, + ::Error: std::error::Error + Send + Sync { + Part::new(Body::wrap(hyper::Body::wrap_stream(value))) + } + + fn new(value: Body) -> Part { + Part { + value: value, + mime: None, + file_name: None, + headers: HeaderMap::default() + } + } + + /// Tries to set the mime of this part. + pub fn mime_str(mut self, mime: &str) -> ::Result { + self.mime = Some(try_!(mime.parse())); + Ok(self) + } + + // Re-enable when mime 0.4 is available, with split MediaType/MediaRange. + #[cfg(test)] + fn mime(mut self, mime: Mime) -> Part { + self.mime = Some(mime); + self + } + + /// Sets the filename, builder style. + pub fn file_name>>(mut self, filename: T) -> Part { + self.file_name = Some(filename.into()); + self + } + + +} + +impl PartProp for Part { + fn mime(&self) -> &Option { + &self.mime + } + + fn mime_mut(&mut self) -> &mut Option { + &mut self.mime + } + + fn file_name(&self) -> &Option> { + &self.file_name + } + + fn file_name_mut(&mut self) -> &mut Option> { + &mut self.file_name + } + + fn headers(&self) -> &HeaderMap { + &self.headers + } + + fn headers_mut(&mut self) -> &mut HeaderMap { + &mut self.headers + } +} + +impl fmt::Debug for Part { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Part") + .field("value", &self.value) + .field("mime", &self.mime) + .field("file_name", &self.file_name) + .field("headers", &self.headers) + .finish() + } +} + + +#[cfg(test)] +mod tests { + use super::*; + use tokio; + + #[test] + fn form_empty() { + let form = Form::new(); + + let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt"); + let body_ft = form.stream(); + + let out = rt.block_on(body_ft.map(|c| c.into_bytes()).concat2()); + assert_eq!(out.unwrap(), Vec::new()); + } + + #[test] + fn stream_to_end() { + let mut form = Form::new() + .part("reader1", Part::stream(futures::stream::once::<_, hyper::Error>(Ok(hyper::Chunk::from("part1".to_owned()))))) + .part("key1", Part::text("value1")) + .part( + "key2", + Part::text("value2").mime(::mime::IMAGE_BMP), + ) + .part("reader2", Part::stream(futures::stream::once::<_, hyper::Error>(Ok(hyper::Chunk::from("part2".to_owned()))))) + .part( + "key3", + Part::text("value3").file_name("filename"), + ); + form.boundary = "boundary".to_string(); + let expected = "--boundary\r\n\ + Content-Disposition: form-data; name=\"reader1\"\r\n\r\n\ + part1\r\n\ + --boundary\r\n\ + Content-Disposition: form-data; name=\"key1\"\r\n\r\n\ + value1\r\n\ + --boundary\r\n\ + Content-Disposition: form-data; name=\"key2\"\r\n\ + Content-Type: image/bmp\r\n\r\n\ + value2\r\n\ + --boundary\r\n\ + Content-Disposition: form-data; name=\"reader2\"\r\n\r\n\ + part2\r\n\ + --boundary\r\n\ + Content-Disposition: form-data; name=\"key3\"; filename=\"filename\"\r\n\r\n\ + value3\r\n--boundary--\r\n"; + let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt"); + let body_ft = form.stream(); + + let out = rt.block_on(body_ft.map(|c| c.into_bytes()).concat2()).unwrap(); + // These prints are for debug purposes in case the test fails + println!( + "START REAL\n{}\nEND REAL", + ::std::str::from_utf8(&out).unwrap() + ); + println!("START EXPECTED\n{}\nEND EXPECTED", expected); + assert_eq!(::std::str::from_utf8(&out).unwrap(), expected); + } + + #[test] + fn stream_to_end_with_header() { + let mut part = Part::text("value2").mime(::mime::IMAGE_BMP); + part.headers_mut().insert("Hdr3", "/a/b/c".parse().unwrap()); + let mut form = Form::new().part("key2", part); + form.boundary = "boundary".to_string(); + let expected = "--boundary\r\n\ + Content-Disposition: form-data; name=\"key2\"\r\n\ + Content-Type: image/bmp\r\n\ + hdr3: /a/b/c\r\n\ + \r\n\ + value2\r\n\ + --boundary--\r\n"; + let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt"); + let body_ft = form.stream(); + + let out = rt.block_on(body_ft.map(|c| c.into_bytes()).concat2()).unwrap(); + // These prints are for debug purposes in case the test fails + println!( + "START REAL\n{}\nEND REAL", + ::std::str::from_utf8(&out).unwrap() + ); + println!("START EXPECTED\n{}\nEND EXPECTED", expected); + assert_eq!(::std::str::from_utf8(&out).unwrap(), expected); + } + + #[test] + fn header_percent_encoding() { + let name = "start%'\"\r\nßend"; + let field = Part::text(""); + + assert_eq!( + PercentEncoding::PathSegment.encode_headers(name, &field), + &b"Content-Disposition: form-data; name*=utf-8''start%25'%22%0D%0A%C3%9Fend"[..] + ); + + assert_eq!( + PercentEncoding::AttrChar.encode_headers(name, &field), + &b"Content-Disposition: form-data; name*=utf-8''start%25%27%22%0D%0A%C3%9Fend"[..] + ); + } +} diff --git a/src/async_impl/request.rs b/src/async_impl/request.rs index ce2502f..0548df1 100644 --- a/src/async_impl/request.rs +++ b/src/async_impl/request.rs @@ -6,6 +6,7 @@ use serde_json; use serde_urlencoded; use super::body::{Body}; +use super::multipart as multipart_; use super::client::{Client, Pending}; use header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue}; use http::HttpTryFrom; @@ -179,6 +180,47 @@ impl RequestBuilder { self } + /// Sends a multipart/form-data body. + /// + /// ``` + /// # extern crate futures; + /// # extern crate reqwest; + /// + /// # use reqwest::Error; + /// # use futures::future::Future; + /// + /// # fn run() -> Result<(), Error> { + /// let client = reqwest::async::Client::new(); + /// let form = reqwest::async::multipart::Form::new() + /// .text("key3", "value3") + /// .text("key4", "value4"); + /// + /// let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt"); + /// + /// let response = client.post("your url") + /// .multipart(form) + /// .send() + /// .and_then(|_| { + /// Ok(()) + /// }); + /// + /// rt.block_on(response) + /// # } + /// ``` + pub fn multipart(self, multipart: multipart_::Form) -> RequestBuilder { + let mut builder = self.header( + CONTENT_TYPE, + format!( + "multipart/form-data; boundary={}", + multipart.boundary() + ).as_str() + ); + if let Ok(ref mut req) = builder.request { + *req.body_mut() = Some(Body::wrap(multipart.stream())) + } + builder + } + /// Modify the query string of the URL. /// /// Modifies the URL of this request, adding the parameters provided. diff --git a/src/lib.rs b/src/lib.rs index 05cfdb9..dcc4546 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -232,6 +232,7 @@ pub mod async { RequestBuilder, Response, ResponseBuilderExt, + multipart }; } diff --git a/src/multipart.rs b/src/multipart.rs index 8dc58dd..4d5ae9a 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -20,7 +20,7 @@ pub struct Form { percent_encoding: PercentEncoding, } -enum PercentEncoding { +pub(crate) enum PercentEncoding { PathSegment, AttrChar, } @@ -145,6 +145,15 @@ impl fmt::Debug for Form { } } +pub(crate) trait PartProp { + fn mime(&self) -> &Option; + fn mime_mut(&mut self) -> &mut Option; + fn file_name(&self) -> &Option>; + fn file_name_mut(&mut self) -> &mut Option>; + fn headers(&self) -> &HeaderMap; + fn headers_mut(&mut self) -> &mut HeaderMap; +} + /// A field in a multipart form. pub struct Part { @@ -240,13 +249,31 @@ impl Part { self } - /// Returns a reference to the map with additional header fields - pub fn headers(&self) -> &HeaderMap { + +} + +impl PartProp for Part { + fn mime(&self) -> &Option { + &self.mime + } + + fn mime_mut(&mut self) -> &mut Option { + &mut self.mime + } + + fn file_name(&self) -> &Option> { + &self.file_name + } + + fn file_name_mut(&mut self) -> &mut Option> { + &mut self.file_name + } + + fn headers(&self) -> &HeaderMap { &self.headers } - /// Returns a reference to the map with additional header fields - pub fn headers_mut(&mut self) -> &mut HeaderMap { + fn headers_mut(&mut self) -> &mut HeaderMap { &mut self.headers } } @@ -342,7 +369,7 @@ impl Read for Reader { } #[derive(Debug, Clone)] -struct AttrCharEncodeSet; +pub(crate) struct AttrCharEncodeSet; impl EncodeSet for AttrCharEncodeSet { fn contains(&self, ch: u8) -> bool { @@ -369,20 +396,20 @@ impl EncodeSet for AttrCharEncodeSet { } impl PercentEncoding { - fn encode_headers(&self, name: &str, field: &Part) -> Vec { + pub fn encode_headers(&self, name: &str, field: &T) -> Vec { let s = format!( "Content-Disposition: form-data; {}{}{}", self.format_parameter("name", name), - match field.file_name { + match *field.file_name() { Some(ref file_name) => format!("; {}", self.format_parameter("filename", file_name)), None => String::new(), }, - match field.mime { + match *field.mime() { Some(ref mime) => format!("\r\nContent-Type: {}", mime), None => "".to_string(), }, ); - field.headers.iter().fold(s.into_bytes(), |mut header, (k,v)| { + field.headers().iter().fold(s.into_bytes(), |mut header, (k,v)| { header.extend_from_slice(b"\r\n"); header.extend_from_slice(k.as_str().as_bytes()); header.extend_from_slice(b": "); diff --git a/tests/async.rs b/tests/async.rs index 7b799aa..0bfbdfc 100644 --- a/tests/async.rs +++ b/tests/async.rs @@ -1,12 +1,14 @@ extern crate futures; extern crate libflate; extern crate reqwest; +extern crate hyper; extern crate tokio; #[macro_use] mod support; use reqwest::async::Client; +use reqwest::async::multipart::{Form, Part}; use futures::{Future, Stream}; use std::io::Write; use std::time::Duration; @@ -21,6 +23,78 @@ fn async_test_gzip_single_byte_chunks() { test_gzip(10, 1); } +#[test] +fn async_test_multipart() { + let _ = env_logger::try_init(); + + let stream = futures::stream::once::<_, hyper::Error>(Ok(hyper::Chunk::from("part1 part2".to_owned()))); + let part = Part::stream(stream); + + let form = Form::new() + .text("foo", "bar") + .part("part_stream", part); + + let expected_body = format!("\ + 24\r\n\ + --{0}\r\n\r\n\ + 2E\r\n\ + Content-Disposition: form-data; name=\"foo\"\r\n\r\n\r\n\ + 3\r\n\ + bar\r\n\ + 2\r\n\ + \r\n\r\n\ + 24\r\n\ + --{0}\r\n\r\n\ + 36\r\n\ + Content-Disposition: form-data; name=\"part_stream\"\r\n\r\n\r\n\ + B\r\n\ + part1 part2\r\n\ + 2\r\n\ + \r\n\r\n\ + 26\r\n\ + --{0}--\r\n\r\n\ + 0\r\n\r\n\ + ", form.boundary()); + + let server = server! { + request: format!("\ + POST /multipart/1 HTTP/1.1\r\n\ + user-agent: $USERAGENT\r\n\ + accept: */*\r\n\ + content-type: multipart/form-data; boundary={}\r\n\ + accept-encoding: gzip\r\n\ + host: $HOST\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + {}\ + ", form.boundary(), expected_body), + response: b"\ + HTTP/1.1 200 OK\r\n\ + Server: multipart\r\n\ + Content-Length: 0\r\n\ + \r\n\ + " + }; + + let url = format!("http://{}/multipart/1", server.addr()); + + let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt"); + + let client = Client::new(); + + let res_future = client.post(&url) + .multipart(form) + .send() + .and_then(|res| { + assert_eq!(res.url().as_str(), &url); + assert_eq!(res.status(), reqwest::StatusCode::OK); + + Ok(()) + }); + + rt.block_on(res_future).unwrap(); +} + fn test_gzip(response_size: usize, chunk_size: usize) { let content: String = (0..response_size).into_iter().map(|i| format!("test {}", i)).collect(); let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap();