feat(server): dropping a Response will write out to the underlying stream
This commit is contained in:
		| @@ -27,25 +27,21 @@ fn echo(mut req: Request, mut res: Response) { | |||||||
|                 res.headers_mut().set(ContentLength(out.len() as u64)); |                 res.headers_mut().set(ContentLength(out.len() as u64)); | ||||||
|                 let mut res = try_return!(res.start()); |                 let mut res = try_return!(res.start()); | ||||||
|                 try_return!(res.write_all(out)); |                 try_return!(res.write_all(out)); | ||||||
|                 try_return!(res.end()); |  | ||||||
|                 return; |                 return; | ||||||
|             }, |             }, | ||||||
|             (&Post, "/echo") => (), // fall through, fighting mutable borrows |             (&Post, "/echo") => (), // fall through, fighting mutable borrows | ||||||
|             _ => { |             _ => { | ||||||
|                 *res.status_mut() = hyper::NotFound; |                 *res.status_mut() = hyper::NotFound; | ||||||
|                 try_return!(res.start().and_then(|res| res.end())); |  | ||||||
|                 return; |                 return; | ||||||
|             } |             } | ||||||
|         }, |         }, | ||||||
|         _ => { |         _ => { | ||||||
|             try_return!(res.start().and_then(|res| res.end())); |  | ||||||
|             return; |             return; | ||||||
|         } |         } | ||||||
|     }; |     }; | ||||||
|  |  | ||||||
|     let mut res = try_return!(res.start()); |     let mut res = try_return!(res.start()); | ||||||
|     try_return!(copy(&mut req, &mut res)); |     try_return!(copy(&mut req, &mut res)); | ||||||
|     try_return!(res.end()); |  | ||||||
| } | } | ||||||
|  |  | ||||||
| fn main() { | fn main() { | ||||||
|   | |||||||
| @@ -15,8 +15,6 @@ | |||||||
| //!         (hyper::Get, _) => StatusCode::NotFound, | //!         (hyper::Get, _) => StatusCode::NotFound, | ||||||
| //!         _ => StatusCode::MethodNotAllowed | //!         _ => StatusCode::MethodNotAllowed | ||||||
| //!     }; | //!     }; | ||||||
| //! |  | ||||||
| //!     res.start().unwrap().end().unwrap(); |  | ||||||
| //! }).listen("0.0.0.0:8080").unwrap(); | //! }).listen("0.0.0.0:8080").unwrap(); | ||||||
| use std::fmt; | use std::fmt; | ||||||
| use std::io::{ErrorKind, BufWriter, Write}; | use std::io::{ErrorKind, BufWriter, Write}; | ||||||
|   | |||||||
| @@ -2,8 +2,11 @@ | |||||||
| //! | //! | ||||||
| //! These are responses sent by a `hyper::Server` to clients, after | //! These are responses sent by a `hyper::Server` to clients, after | ||||||
| //! receiving a request. | //! receiving a request. | ||||||
|  | use std::any::{Any, TypeId}; | ||||||
| use std::marker::PhantomData; | use std::marker::PhantomData; | ||||||
|  | use std::mem; | ||||||
| use std::io::{self, Write}; | use std::io::{self, Write}; | ||||||
|  | use std::ptr; | ||||||
|  |  | ||||||
| use time::now_utc; | use time::now_utc; | ||||||
|  |  | ||||||
| @@ -14,9 +17,10 @@ use status; | |||||||
| use net::{Fresh, Streaming}; | use net::{Fresh, Streaming}; | ||||||
| use version; | use version; | ||||||
|  |  | ||||||
|  |  | ||||||
| /// The outgoing half for a Tcp connection, created by a `Server` and given to a `Handler`. | /// The outgoing half for a Tcp connection, created by a `Server` and given to a `Handler`. | ||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| pub struct Response<'a, W = Fresh> { | pub struct Response<'a, W: Any = Fresh> { | ||||||
|     /// The HTTP version of this response. |     /// The HTTP version of this response. | ||||||
|     pub version: version::HttpVersion, |     pub version: version::HttpVersion, | ||||||
|     // Stream the Response is writing to, not accessible through UnwrittenResponse |     // Stream the Response is writing to, not accessible through UnwrittenResponse | ||||||
| @@ -26,10 +30,10 @@ pub struct Response<'a, W = Fresh> { | |||||||
|     // The outgoing headers on this response. |     // The outgoing headers on this response. | ||||||
|     headers: header::Headers, |     headers: header::Headers, | ||||||
|  |  | ||||||
|     _marker: PhantomData<W> |     _writing: PhantomData<W> | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<'a, W> Response<'a, W> { | impl<'a, W: Any> Response<'a, W> { | ||||||
|     /// The status of this response. |     /// The status of this response. | ||||||
|     #[inline] |     #[inline] | ||||||
|     pub fn status(&self) -> status::StatusCode { self.status } |     pub fn status(&self) -> status::StatusCode { self.status } | ||||||
| @@ -47,31 +51,26 @@ impl<'a, W> Response<'a, W> { | |||||||
|             version: version, |             version: version, | ||||||
|             body: body, |             body: body, | ||||||
|             headers: headers, |             headers: headers, | ||||||
|             _marker: PhantomData, |             _writing: PhantomData, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Deconstruct this Response into its constituent parts. |     /// Deconstruct this Response into its constituent parts. | ||||||
|     pub fn deconstruct(self) -> (version::HttpVersion, HttpWriter<&'a mut (Write + 'a)>, |     pub fn deconstruct(self) -> (version::HttpVersion, HttpWriter<&'a mut (Write + 'a)>, | ||||||
|                                  status::StatusCode, header::Headers) { |                                  status::StatusCode, header::Headers) { | ||||||
|         (self.version, self.body, self.status, self.headers) |         unsafe { | ||||||
|     } |             let parts = ( | ||||||
| } |                 self.version, | ||||||
|  |                 ptr::read(&self.body), | ||||||
| impl<'a> Response<'a, Fresh> { |                 self.status, | ||||||
|     /// Creates a new Response that can be used to write to a network stream. |                 ptr::read(&self.headers) | ||||||
|     pub fn new(stream: &'a mut (Write + 'a)) -> Response<'a, Fresh> { |             ); | ||||||
|         Response { |             mem::forget(self); | ||||||
|             status: status::StatusCode::Ok, |             parts | ||||||
|             version: version::HttpVersion::Http11, |  | ||||||
|             headers: header::Headers::new(), |  | ||||||
|             body: ThroughWriter(stream), |  | ||||||
|             _marker: PhantomData, |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Consume this Response<Fresh>, writing the Headers and Status and creating a Response<Streaming> |     fn write_head(&mut self) -> io::Result<Body> { | ||||||
|     pub fn start(mut self) -> io::Result<Response<'a, Streaming>> { |  | ||||||
|         debug!("writing head: {:?} {:?}", self.version, self.status); |         debug!("writing head: {:?} {:?}", self.version, self.status); | ||||||
|         try!(write!(&mut self.body, "{} {}{}{}", self.version, self.status, CR as char, LF as char)); |         try!(write!(&mut self.body, "{} {}{}{}", self.version, self.status, CR as char, LF as char)); | ||||||
|  |  | ||||||
| @@ -80,19 +79,14 @@ impl<'a> Response<'a, Fresh> { | |||||||
|         } |         } | ||||||
|  |  | ||||||
|  |  | ||||||
|         let mut chunked = true; |         let mut body_type = Body::Chunked; | ||||||
|         let mut len = 0; |  | ||||||
|  |  | ||||||
|         match self.headers.get::<header::ContentLength>() { |         if let Some(cl) = self.headers.get::<header::ContentLength>() { | ||||||
|             Some(cl) => { |             body_type = Body::Sized(**cl); | ||||||
|                 chunked = false; |  | ||||||
|                 len = **cl; |  | ||||||
|             }, |  | ||||||
|             None => () |  | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         // can't do in match above, thanks borrowck |         // can't do in match above, thanks borrowck | ||||||
|         if chunked { |         if body_type == Body::Chunked { | ||||||
|             let encodings = match self.headers.get_mut::<header::TransferEncoding>() { |             let encodings = match self.headers.get_mut::<header::TransferEncoding>() { | ||||||
|                 Some(&mut header::TransferEncoding(ref mut encodings)) => { |                 Some(&mut header::TransferEncoding(ref mut encodings)) => { | ||||||
|                     //TODO: check if chunked is already in encodings. use HashSet? |                     //TODO: check if chunked is already in encodings. use HashSet? | ||||||
| @@ -113,46 +107,208 @@ impl<'a> Response<'a, Fresh> { | |||||||
|         try!(write!(&mut self.body, "{}", self.headers)); |         try!(write!(&mut self.body, "{}", self.headers)); | ||||||
|         try!(write!(&mut self.body, "{}", LINE_ENDING)); |         try!(write!(&mut self.body, "{}", LINE_ENDING)); | ||||||
|  |  | ||||||
|         let stream = if chunked { |         Ok(body_type) | ||||||
|             ChunkedWriter(self.body.into_inner()) |     } | ||||||
|         } else { |  | ||||||
|             SizedWriter(self.body.into_inner(), len) |  | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<'a> Response<'a, Fresh> { | ||||||
|  |     /// Creates a new Response that can be used to write to a network stream. | ||||||
|  |     #[inline] | ||||||
|  |     pub fn new(stream: &'a mut (Write + 'a)) -> Response<'a, Fresh> { | ||||||
|  |         Response { | ||||||
|  |             status: status::StatusCode::Ok, | ||||||
|  |             version: version::HttpVersion::Http11, | ||||||
|  |             headers: header::Headers::new(), | ||||||
|  |             body: ThroughWriter(stream), | ||||||
|  |             _writing: PhantomData, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /// Consume this Response<Fresh>, writing the Headers and Status and creating a Response<Streaming> | ||||||
|  |     pub fn start(mut self) -> io::Result<Response<'a, Streaming>> { | ||||||
|  |         let body_type = try!(self.write_head()); | ||||||
|  |         let (version, body, status, headers) = self.deconstruct(); | ||||||
|  |         let stream = match body_type { | ||||||
|  |             Body::Chunked => ChunkedWriter(body.into_inner()), | ||||||
|  |             Body::Sized(len) => SizedWriter(body.into_inner(), len) | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         // "copy" to change the phantom type |         // "copy" to change the phantom type | ||||||
|         Ok(Response { |         Ok(Response { | ||||||
|             version: self.version, |             version: version, | ||||||
|             body: stream, |             body: stream, | ||||||
|             status: self.status, |             status: status, | ||||||
|             headers: self.headers, |             headers: headers, | ||||||
|             _marker: PhantomData, |             _writing: PhantomData, | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Get a mutable reference to the status. |     /// Get a mutable reference to the status. | ||||||
|     #[inline] |     #[inline] | ||||||
|     pub fn status_mut(&mut self) -> &mut status::StatusCode { &mut self.status } |     pub fn status_mut(&mut self) -> &mut status::StatusCode { &mut self.status } | ||||||
|  |  | ||||||
|     /// Get a mutable reference to the Headers. |     /// Get a mutable reference to the Headers. | ||||||
|  |     #[inline] | ||||||
|     pub fn headers_mut(&mut self) -> &mut header::Headers { &mut self.headers } |     pub fn headers_mut(&mut self) -> &mut header::Headers { &mut self.headers } | ||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
| impl<'a> Response<'a, Streaming> { | impl<'a> Response<'a, Streaming> { | ||||||
|     /// Flushes all writing of a response to the client. |     /// Flushes all writing of a response to the client. | ||||||
|  |     #[inline] | ||||||
|     pub fn end(self) -> io::Result<()> { |     pub fn end(self) -> io::Result<()> { | ||||||
|         debug!("ending"); |         trace!("ending"); | ||||||
|         try!(self.body.end()); |         let (_, body, _, _) = self.deconstruct(); | ||||||
|  |         try!(body.end()); | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<'a> Write for Response<'a, Streaming> { | impl<'a> Write for Response<'a, Streaming> { | ||||||
|  |     #[inline] | ||||||
|     fn write(&mut self, msg: &[u8]) -> io::Result<usize> { |     fn write(&mut self, msg: &[u8]) -> io::Result<usize> { | ||||||
|         debug!("write {:?} bytes", msg.len()); |         debug!("write {:?} bytes", msg.len()); | ||||||
|         self.body.write(msg) |         self.body.write(msg) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[inline] | ||||||
|     fn flush(&mut self) -> io::Result<()> { |     fn flush(&mut self) -> io::Result<()> { | ||||||
|         self.body.flush() |         self.body.flush() | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[derive(PartialEq)] | ||||||
|  | enum Body { | ||||||
|  |     Chunked, | ||||||
|  |     Sized(u64), | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<'a, T: Any> Drop for Response<'a, T> { | ||||||
|  |     fn drop(&mut self) { | ||||||
|  |         if TypeId::of::<T>() == TypeId::of::<Fresh>() { | ||||||
|  |             let mut body = match self.write_head() { | ||||||
|  |                 Ok(Body::Chunked) => ChunkedWriter(self.body.get_mut()), | ||||||
|  |                 Ok(Body::Sized(len)) => SizedWriter(self.body.get_mut(), len), | ||||||
|  |                 Err(e) => { | ||||||
|  |                     debug!("error dropping request: {:?}", e); | ||||||
|  |                     return; | ||||||
|  |                 } | ||||||
|  |             }; | ||||||
|  |             end(&mut body); | ||||||
|  |         } else { | ||||||
|  |             end(&mut self.body); | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |  | ||||||
|  |         #[inline] | ||||||
|  |         fn end<W: Write>(w: &mut W) { | ||||||
|  |             match w.write(&[]) { | ||||||
|  |                 Ok(_) => match w.flush() { | ||||||
|  |                     Ok(_) => debug!("drop successful"), | ||||||
|  |                     Err(e) => debug!("error dropping request: {:?}", e) | ||||||
|  |                 }, | ||||||
|  |                 Err(e) => debug!("error dropping request: {:?}", e) | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[cfg(test)] | ||||||
|  | mod tests { | ||||||
|  |     use mock::MockStream; | ||||||
|  |     use super::Response; | ||||||
|  |  | ||||||
|  |     macro_rules! lines { | ||||||
|  |         ($s:ident = $($line:pat),+) => ({ | ||||||
|  |             let s = String::from_utf8($s.write).unwrap(); | ||||||
|  |             let mut lines = s.split_terminator("\r\n"); | ||||||
|  |  | ||||||
|  |             $( | ||||||
|  |                 match lines.next() { | ||||||
|  |                     Some($line) => (), | ||||||
|  |                     other => panic!("line mismatch: {:?} != {:?}", other, stringify!($line)) | ||||||
|  |                 } | ||||||
|  |             )+ | ||||||
|  |  | ||||||
|  |             assert_eq!(lines.next(), None); | ||||||
|  |         }) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_fresh_start() { | ||||||
|  |         let mut stream = MockStream::new(); | ||||||
|  |         { | ||||||
|  |             let res = Response::new(&mut stream); | ||||||
|  |             res.start().unwrap().deconstruct(); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         lines! { stream = | ||||||
|  |             "HTTP/1.1 200 OK", | ||||||
|  |             _date, | ||||||
|  |             _transfer_encoding, | ||||||
|  |             "" | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_streaming_end() { | ||||||
|  |         let mut stream = MockStream::new(); | ||||||
|  |         { | ||||||
|  |             let res = Response::new(&mut stream); | ||||||
|  |             res.start().unwrap().end().unwrap(); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         lines! { stream = | ||||||
|  |             "HTTP/1.1 200 OK", | ||||||
|  |             _date, | ||||||
|  |             _transfer_encoding, | ||||||
|  |             "", | ||||||
|  |             "0", | ||||||
|  |             "" // empty zero body | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_fresh_drop() { | ||||||
|  |         use status::StatusCode; | ||||||
|  |         let mut stream = MockStream::new(); | ||||||
|  |         { | ||||||
|  |             let mut res = Response::new(&mut stream); | ||||||
|  |             *res.status_mut() = StatusCode::NotFound; | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         lines! { stream = | ||||||
|  |             "HTTP/1.1 404 Not Found", | ||||||
|  |             _date, | ||||||
|  |             _transfer_encoding, | ||||||
|  |             "", | ||||||
|  |             "0", | ||||||
|  |             "" // empty zero body | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_streaming_drop() { | ||||||
|  |         use std::io::Write; | ||||||
|  |         use status::StatusCode; | ||||||
|  |         let mut stream = MockStream::new(); | ||||||
|  |         { | ||||||
|  |             let mut res = Response::new(&mut stream); | ||||||
|  |             *res.status_mut() = StatusCode::NotFound; | ||||||
|  |             let mut stream = res.start().unwrap(); | ||||||
|  |             stream.write_all(b"foo").unwrap(); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         lines! { stream = | ||||||
|  |             "HTTP/1.1 404 Not Found", | ||||||
|  |             _date, | ||||||
|  |             _transfer_encoding, | ||||||
|  |             "", | ||||||
|  |             "3", | ||||||
|  |             "foo", | ||||||
|  |             "0", | ||||||
|  |             "" // empty zero body | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user