fix(client): return error instead of unmatched response when idle
This commit is contained in:
		| @@ -86,6 +86,9 @@ where I: AsyncRead + AsyncWrite, | ||||
|                     }); | ||||
|             } else { | ||||
|                 trace!("poll when on keep-alive"); | ||||
|                 if !T::should_read_first() { | ||||
|                     self.try_empty_read()?; | ||||
|                 } | ||||
|                 self.maybe_park_read(); | ||||
|                 return Ok(Async::NotReady); | ||||
|             } | ||||
| @@ -102,7 +105,17 @@ where I: AsyncRead + AsyncWrite, | ||||
|  | ||||
|     pub fn can_read_head(&self) -> bool { | ||||
|         match self.state.reading { | ||||
|             Reading::Init => true, | ||||
|             //Reading::Init => true, | ||||
|             Reading::Init => { | ||||
|                 if T::should_read_first() { | ||||
|                     true | ||||
|                 } else { | ||||
|                     match self.state.writing { | ||||
|                         Writing::Init => false, | ||||
|                         _ => true, | ||||
|                     } | ||||
|                 } | ||||
|             }, | ||||
|             _ => false, | ||||
|         } | ||||
|     } | ||||
| @@ -219,6 +232,41 @@ where I: AsyncRead + AsyncWrite, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     // This will check to make sure the io object read is empty. | ||||
|     // | ||||
|     // This should only be called for Clients wanting to enter the idle | ||||
|     // state. | ||||
|     pub fn try_empty_read(&mut self) -> io::Result<()> { | ||||
|         assert!(!self.can_read_head() && !self.can_read_body()); | ||||
|  | ||||
|         if !self.io.read_buf().is_empty() { | ||||
|             Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended")) | ||||
|         } else { | ||||
|              match self.io.read_from_io() { | ||||
|                 Ok(Async::Ready(0)) => { | ||||
|                     self.state.close_read(); | ||||
|                     let must_error = !self.state.is_idle() && T::should_error_on_parse_eof(); | ||||
|                     if must_error { | ||||
|                         Err(io::ErrorKind::UnexpectedEof.into()) | ||||
|                     } else { | ||||
|                         Ok(()) | ||||
|                     } | ||||
|                 }, | ||||
|                 Ok(Async::Ready(_)) => { | ||||
|                     Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended")) | ||||
|                 }, | ||||
|                 Ok(Async::NotReady) => { | ||||
|                     trace!("try_empty_read; read blocked"); | ||||
|                     Ok(()) | ||||
|                 }, | ||||
|                 Err(e) => { | ||||
|                     self.state.close(); | ||||
|                     Err(e) | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn maybe_notify(&mut self) { | ||||
|         // its possible that we returned NotReady from poll() without having | ||||
|         // exhausted the underlying Io. We would have done this when we | ||||
| @@ -882,7 +930,7 @@ mod tests { | ||||
|     fn test_conn_init_read_eof_busy() { | ||||
|         let _: Result<(), ()> = future::lazy(|| { | ||||
|             // server ignores | ||||
|             let io = AsyncIo::new_buf(vec![], 1); | ||||
|             let io = AsyncIo::new_eof(); | ||||
|             let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default()); | ||||
|             conn.state.busy(); | ||||
|  | ||||
| @@ -892,7 +940,7 @@ mod tests { | ||||
|             } | ||||
|  | ||||
|             // client | ||||
|             let io = AsyncIo::new_buf(vec![], 1); | ||||
|             let io = AsyncIo::new_eof(); | ||||
|             let mut conn = Conn::<_, proto::Chunk, ClientTransaction>::new(io, Default::default()); | ||||
|             conn.state.busy(); | ||||
|  | ||||
|   | ||||
| @@ -137,6 +137,9 @@ where | ||||
|                 } else { | ||||
|                     let _ = body.close(); | ||||
|                 } | ||||
|             } else if !T::should_read_first() { | ||||
|                 self.conn.try_empty_read()?; | ||||
|                 return Ok(Async::NotReady); | ||||
|             } else { | ||||
|                 self.conn.maybe_park_read(); | ||||
|                 return Ok(Async::Ready(())); | ||||
| @@ -188,13 +191,6 @@ where | ||||
|     } | ||||
|  | ||||
|     fn is_done(&self) -> bool { | ||||
|         trace!( | ||||
|             "is_done; read={}, write={}, should_poll={}, body={}", | ||||
|             self.conn.is_read_closed(), | ||||
|             self.conn.is_write_closed(), | ||||
|             self.dispatch.should_poll(), | ||||
|             self.body_rx.is_some(), | ||||
|         ); | ||||
|         let read_done = self.conn.is_read_closed(); | ||||
|  | ||||
|         if !T::should_read_first() && read_done { | ||||
| @@ -223,6 +219,7 @@ where | ||||
|  | ||||
|     #[inline] | ||||
|     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||
|         trace!("Dispatcher::poll"); | ||||
|         self.poll_read()?; | ||||
|         self.poll_write()?; | ||||
|         self.poll_flush()?; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user