fix(client): EofReader by nature means the connection is closed
This commit is contained in:
		| @@ -145,7 +145,16 @@ struct PooledStreamInner<S> { | |||||||
| impl<S: NetworkStream> Read for PooledStream<S> { | impl<S: NetworkStream> Read for PooledStream<S> { | ||||||
|     #[inline] |     #[inline] | ||||||
|     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { | ||||||
|         self.inner.as_mut().unwrap().stream.read(buf) |         match self.inner.as_mut().unwrap().stream.read(buf) { | ||||||
|  |             Ok(0) => { | ||||||
|  |                 // if the wrapped stream returns EOF (Ok(0)), that means the | ||||||
|  |                 // server has closed the stream. we must be sure this stream | ||||||
|  |                 // is dropped and not put back into the pool. | ||||||
|  |                 self.is_closed = true; | ||||||
|  |                 Ok(0) | ||||||
|  |             }, | ||||||
|  |             r => r | ||||||
|  |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -216,6 +225,7 @@ impl<S> Drop for PooledStream<S> { | |||||||
| #[cfg(test)] | #[cfg(test)] | ||||||
| mod tests { | mod tests { | ||||||
|     use std::net::Shutdown; |     use std::net::Shutdown; | ||||||
|  |     use std::io::Read; | ||||||
|     use mock::{MockConnector}; |     use mock::{MockConnector}; | ||||||
|     use net::{NetworkConnector, NetworkStream}; |     use net::{NetworkConnector, NetworkStream}; | ||||||
|  |  | ||||||
| @@ -254,4 +264,15 @@ mod tests { | |||||||
|         let locked = pool.inner.lock().unwrap(); |         let locked = pool.inner.lock().unwrap(); | ||||||
|         assert_eq!(locked.conns.len(), 0); |         assert_eq!(locked.conns.len(), 0); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_eof_closes() { | ||||||
|  |         let pool = mocked!(); | ||||||
|  |  | ||||||
|  |         let mut stream = pool.connect("127.0.0.1", 3000, "http").unwrap(); | ||||||
|  |         assert_eq!(stream.read(&mut [0]).unwrap(), 0); | ||||||
|  |         drop(stream); | ||||||
|  |         let locked = pool.inner.lock().unwrap(); | ||||||
|  |         assert_eq!(locked.conns.len(), 0); | ||||||
|  |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -23,7 +23,6 @@ pub struct Response { | |||||||
|     pub url: Url, |     pub url: Url, | ||||||
|     status_raw: RawStatus, |     status_raw: RawStatus, | ||||||
|     message: Box<HttpMessage>, |     message: Box<HttpMessage>, | ||||||
|     is_drained: bool, |  | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Response { | impl Response { | ||||||
| @@ -54,7 +53,6 @@ impl Response { | |||||||
|             headers: headers, |             headers: headers, | ||||||
|             url: url, |             url: url, | ||||||
|             status_raw: raw_status, |             status_raw: raw_status, | ||||||
|             is_drained: !message.has_body(), |  | ||||||
|             message: message, |             message: message, | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| @@ -70,10 +68,6 @@ impl Read for Response { | |||||||
|     #[inline] |     #[inline] | ||||||
|     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { | ||||||
|         match self.message.read(buf) { |         match self.message.read(buf) { | ||||||
|             Ok(0) => { |  | ||||||
|                 self.is_drained = true; |  | ||||||
|                 Ok(0) |  | ||||||
|             }, |  | ||||||
|             Err(e) => { |             Err(e) => { | ||||||
|                 let _ = self.message.close_connection(); |                 let _ = self.message.close_connection(); | ||||||
|                 Err(e) |                 Err(e) | ||||||
| @@ -90,8 +84,9 @@ impl Drop for Response { | |||||||
|         // |         // | ||||||
|         // otherwise, the response has been drained. we should check that the |         // otherwise, the response has been drained. we should check that the | ||||||
|         // server has agreed to keep the connection open |         // server has agreed to keep the connection open | ||||||
|         trace!("Response.drop is_drained={}", self.is_drained); |         let is_drained = !self.message.has_body(); | ||||||
|         if !(self.is_drained && http::should_keep_alive(self.version, &self.headers)) { |         trace!("Response.drop is_drained={}", is_drained); | ||||||
|  |         if !(is_drained && http::should_keep_alive(self.version, &self.headers)) { | ||||||
|             trace!("Response.drop closing connection"); |             trace!("Response.drop closing connection"); | ||||||
|             if let Err(e) = self.message.close_connection() { |             if let Err(e) = self.message.close_connection() { | ||||||
|                 error!("Response.drop error closing connection: {}", e); |                 error!("Response.drop error closing connection: {}", e); | ||||||
|   | |||||||
| @@ -247,6 +247,7 @@ impl HttpMessage for Http11Message { | |||||||
|             Some(EmptyReader(..)) | |             Some(EmptyReader(..)) | | ||||||
|             Some(SizedReader(_, 0)) | |             Some(SizedReader(_, 0)) | | ||||||
|             Some(ChunkedReader(_, Some(0))) => false, |             Some(ChunkedReader(_, Some(0))) => false, | ||||||
|  |             // specifically EofReader is always true | ||||||
|             _ => true |             _ => true | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user