fix(client): check for drained stream in Response::drop
This commit is contained in:
		| @@ -116,7 +116,6 @@ impl<C: NetworkConnector<Stream=S>, S: NetworkStream + Send> NetworkConnector fo | |||||||
|         Ok(PooledStream { |         Ok(PooledStream { | ||||||
|             inner: Some((key, conn)), |             inner: Some((key, conn)), | ||||||
|             is_closed: false, |             is_closed: false, | ||||||
|             is_drained: false, |  | ||||||
|             pool: self.inner.clone() |             pool: self.inner.clone() | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| @@ -130,20 +129,13 @@ impl<C: NetworkConnector<Stream=S>, S: NetworkStream + Send> NetworkConnector fo | |||||||
| pub struct PooledStream<S> { | pub struct PooledStream<S> { | ||||||
|     inner: Option<(Key, S)>, |     inner: Option<(Key, S)>, | ||||||
|     is_closed: bool, |     is_closed: bool, | ||||||
|     is_drained: bool, |  | ||||||
|     pool: Arc<Mutex<PoolImpl<S>>> |     pool: Arc<Mutex<PoolImpl<S>>> | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<S: NetworkStream> Read for PooledStream<S> { | impl<S: NetworkStream> Read for PooledStream<S> { | ||||||
|     #[inline] |     #[inline] | ||||||
|     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { | ||||||
|         match self.inner.as_mut().unwrap().1.read(buf) { |         self.inner.as_mut().unwrap().1.read(buf) | ||||||
|             Ok(0) => { |  | ||||||
|                 self.is_drained = true; |  | ||||||
|                 Ok(0) |  | ||||||
|             } |  | ||||||
|             r => r |  | ||||||
|         } |  | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -174,8 +166,8 @@ impl<S: NetworkStream> NetworkStream for PooledStream<S> { | |||||||
|  |  | ||||||
| impl<S> Drop for PooledStream<S> { | impl<S> Drop for PooledStream<S> { | ||||||
|     fn drop(&mut self) { |     fn drop(&mut self) { | ||||||
|         trace!("PooledStream.drop, is_closed={}, is_drained={}", self.is_closed, self.is_drained); |         trace!("PooledStream.drop, is_closed={}", self.is_closed); | ||||||
|         if !self.is_closed && self.is_drained { |         if !self.is_closed { | ||||||
|             self.inner.take().map(|(key, conn)| { |             self.inner.take().map(|(key, conn)| { | ||||||
|                 if let Ok(mut pool) = self.pool.lock() { |                 if let Ok(mut pool) = self.pool.lock() { | ||||||
|                     pool.reuse(key, conn); |                     pool.reuse(key, conn); | ||||||
| @@ -205,13 +197,13 @@ mod tests { | |||||||
|     fn test_connect_and_drop() { |     fn test_connect_and_drop() { | ||||||
|         let pool = mocked!(); |         let pool = mocked!(); | ||||||
|         let key = key("127.0.0.1", 3000, "http"); |         let key = key("127.0.0.1", 3000, "http"); | ||||||
|         pool.connect("127.0.0.1", 3000, "http").unwrap().is_drained = true; |         pool.connect("127.0.0.1", 3000, "http").unwrap(); | ||||||
|         { |         { | ||||||
|             let locked = pool.inner.lock().unwrap(); |             let locked = pool.inner.lock().unwrap(); | ||||||
|             assert_eq!(locked.conns.len(), 1); |             assert_eq!(locked.conns.len(), 1); | ||||||
|             assert_eq!(locked.conns.get(&key).unwrap().len(), 1); |             assert_eq!(locked.conns.get(&key).unwrap().len(), 1); | ||||||
|         } |         } | ||||||
|         pool.connect("127.0.0.1", 3000, "http").unwrap().is_drained = true; //reused |         pool.connect("127.0.0.1", 3000, "http").unwrap(); //reused | ||||||
|         { |         { | ||||||
|             let locked = pool.inner.lock().unwrap(); |             let locked = pool.inner.lock().unwrap(); | ||||||
|             assert_eq!(locked.conns.len(), 1); |             assert_eq!(locked.conns.len(), 1); | ||||||
|   | |||||||
| @@ -19,6 +19,7 @@ pub struct Response { | |||||||
|     pub version: version::HttpVersion, |     pub version: version::HttpVersion, | ||||||
|     status_raw: RawStatus, |     status_raw: RawStatus, | ||||||
|     message: Box<HttpMessage>, |     message: Box<HttpMessage>, | ||||||
|  |     is_drained: bool, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Response { | impl Response { | ||||||
| @@ -43,6 +44,7 @@ impl Response { | |||||||
|             headers: headers, |             headers: headers, | ||||||
|             message: message, |             message: message, | ||||||
|             status_raw: raw_status, |             status_raw: raw_status, | ||||||
|  |             is_drained: false, | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -50,34 +52,46 @@ impl Response { | |||||||
|     pub fn status_raw(&self) -> &RawStatus { |     pub fn status_raw(&self) -> &RawStatus { | ||||||
|         &self.status_raw |         &self.status_raw | ||||||
|     } |     } | ||||||
|  |  | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Read for Response { | impl Read for Response { | ||||||
|     #[inline] |     #[inline] | ||||||
|     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { | ||||||
|         let count = try!(self.message.read(buf)); |         match self.message.read(buf) { | ||||||
|  |             Ok(0) => { | ||||||
|  |                 self.is_drained = true; | ||||||
|  |                 Ok(0) | ||||||
|  |             }, | ||||||
|  |             r => r | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|         if count == 0 { | impl Drop for Response { | ||||||
|             if !http::should_keep_alive(self.version, &self.headers) { |     fn drop(&mut self) { | ||||||
|                 try!(self.message.close_connection() |         // if not drained, theres old bits in the Reader. we can't reuse this, | ||||||
|                                  .map_err(|_| io::Error::new(io::ErrorKind::Other, |         // since those old bits would end up in new Responses | ||||||
|                                                              "Error closing connection"))); |         // | ||||||
|  |         // otherwise, the response has been drained. we should check that the | ||||||
|  |         // server has agreed to keep the connection open | ||||||
|  |         trace!("Response.is_drained = {:?}", self.is_drained); | ||||||
|  |         if !(self.is_drained && http::should_keep_alive(self.version, &self.headers)) { | ||||||
|  |             trace!("closing connection"); | ||||||
|  |             if let Err(e) = self.message.close_connection() { | ||||||
|  |                 error!("error closing connection: {}", e); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         Ok(count) |  | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| #[cfg(test)] | #[cfg(test)] | ||||||
| mod tests { | mod tests { | ||||||
|     use std::borrow::Cow::Borrowed; |  | ||||||
|     use std::io::{self, Read}; |     use std::io::{self, Read}; | ||||||
|  |  | ||||||
|     use header::Headers; |  | ||||||
|     use header::TransferEncoding; |     use header::TransferEncoding; | ||||||
|     use header::Encoding; |     use header::Encoding; | ||||||
|     use http::RawStatus; |     use http::HttpMessage; | ||||||
|     use mock::MockStream; |     use mock::MockStream; | ||||||
|     use status; |     use status; | ||||||
|     use version; |     use version; | ||||||
| @@ -94,18 +108,10 @@ mod tests { | |||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_into_inner() { |     fn test_into_inner() { | ||||||
|         let res = Response { |         let message: Box<HttpMessage> = Box::new(Http11Message::with_stream(Box::new(MockStream::new()))); | ||||||
|             status: status::StatusCode::Ok, |         let message = message.downcast::<Http11Message>().ok().unwrap(); | ||||||
|             headers: Headers::new(), |  | ||||||
|             version: version::HttpVersion::Http11, |  | ||||||
|             message: Box::new(Http11Message::with_stream(Box::new(MockStream::new()))), |  | ||||||
|             status_raw: RawStatus(200, Borrowed("OK")), |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         let message = res.message.downcast::<Http11Message>().ok().unwrap(); |  | ||||||
|         let b = message.into_inner().downcast::<MockStream>().ok().unwrap(); |         let b = message.into_inner().downcast::<MockStream>().ok().unwrap(); | ||||||
|         assert_eq!(b, Box::new(MockStream::new())); |         assert_eq!(b, Box::new(MockStream::new())); | ||||||
|  |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|   | |||||||
| @@ -20,6 +20,7 @@ pub struct RawStatus(pub u16, pub Cow<'static, str>); | |||||||
| /// Checks if a connection should be kept alive. | /// Checks if a connection should be kept alive. | ||||||
| #[inline] | #[inline] | ||||||
| pub fn should_keep_alive(version: HttpVersion, headers: &Headers) -> bool { | pub fn should_keep_alive(version: HttpVersion, headers: &Headers) -> bool { | ||||||
|  |     trace!("should_keep_alive( {:?}, {:?} )", version, headers.get::<Connection>()); | ||||||
|     match (version, headers.get::<Connection>()) { |     match (version, headers.get::<Connection>()) { | ||||||
|         (Http10, Some(conn)) if !conn.contains(&KeepAlive) => false, |         (Http10, Some(conn)) if !conn.contains(&KeepAlive) => false, | ||||||
|         (Http11, Some(conn)) if conn.contains(&Close)  => false, |         (Http11, Some(conn)) if conn.contains(&Close)  => false, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user