| @@ -47,6 +47,35 @@ where I: AsyncRead + AsyncWrite, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|  | ||||
|     fn poll2(&mut self) -> Poll<Option<Frame<http::MessageHead<T::Incoming>, http::Chunk, ::Error>>, io::Error> { | ||||
|         trace!("Conn::poll()"); | ||||
|  | ||||
|         loop { | ||||
|             if self.is_read_closed() { | ||||
|                 trace!("Conn::poll when closed"); | ||||
|                 return Ok(Async::Ready(None)); | ||||
|             } else if self.can_read_head() { | ||||
|                 return self.read_head(); | ||||
|             } else if self.can_write_continue() { | ||||
|                 try_nb!(self.flush()); | ||||
|             } else if self.can_read_body() { | ||||
|                 return self.read_body() | ||||
|                     .map(|async| async.map(|chunk| Some(Frame::Body { | ||||
|                         chunk: chunk | ||||
|                     }))) | ||||
|                     .or_else(|err| { | ||||
|                         self.state.close_read(); | ||||
|                         Ok(Async::Ready(Some(Frame::Error { error: err.into() }))) | ||||
|                     }); | ||||
|             } else { | ||||
|                 trace!("poll when on keep-alive"); | ||||
|                 self.maybe_park_read(); | ||||
|                 return Ok(Async::NotReady); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn is_read_closed(&self) -> bool { | ||||
|         self.state.is_read_closed() | ||||
|     } | ||||
| @@ -89,12 +118,9 @@ where I: AsyncRead + AsyncWrite, | ||||
|                 self.state.close_read(); | ||||
|                 self.io.consume_leading_lines(); | ||||
|                 let was_mid_parse = !self.io.read_buf().is_empty(); | ||||
|                 return if was_mid_parse { | ||||
|                 return if was_mid_parse || must_respond_with_error { | ||||
|                     debug!("parse error ({}) with {} bytes", e, self.io.read_buf().len()); | ||||
|                     Ok(Async::Ready(Some(Frame::Error { error: e }))) | ||||
|                 } else if must_respond_with_error { | ||||
|                     trace!("parse error with 0 input, err = {:?}", e); | ||||
|                     Ok(Async::Ready(Some(Frame::Error { error: e }))) | ||||
|                 } else { | ||||
|                     debug!("read eof"); | ||||
|                     Ok(Async::Ready(None)) | ||||
| @@ -379,32 +405,12 @@ where I: AsyncRead + AsyncWrite, | ||||
|     type Item = Frame<http::MessageHead<T::Incoming>, http::Chunk, ::Error>; | ||||
|     type Error = io::Error; | ||||
|  | ||||
|     #[inline] | ||||
|     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | ||||
|         trace!("Conn::poll()"); | ||||
|  | ||||
|         loop { | ||||
|             if self.is_read_closed() { | ||||
|                 trace!("Conn::poll when closed"); | ||||
|                 return Ok(Async::Ready(None)); | ||||
|             } else if self.can_read_head() { | ||||
|                 return self.read_head(); | ||||
|             } else if self.can_write_continue() { | ||||
|                 try_nb!(self.flush()); | ||||
|             } else if self.can_read_body() { | ||||
|                 return self.read_body() | ||||
|                     .map(|async| async.map(|chunk| Some(Frame::Body { | ||||
|                         chunk: chunk | ||||
|                     }))) | ||||
|                     .or_else(|err| { | ||||
|                         self.state.close_read(); | ||||
|                         Ok(Async::Ready(Some(Frame::Error { error: err.into() }))) | ||||
|                     }); | ||||
|             } else { | ||||
|                 trace!("poll when on keep-alive"); | ||||
|                 self.maybe_park_read(); | ||||
|                 return Ok(Async::NotReady); | ||||
|             } | ||||
|         } | ||||
|         self.poll2().map_err(|err| { | ||||
|             debug!("poll error: {}", err); | ||||
|             err | ||||
|         }) | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -460,16 +466,22 @@ where I: AsyncRead + AsyncWrite, | ||||
|  | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { | ||||
|         trace!("Conn::poll_complete()"); | ||||
|         let ret = self.flush(); | ||||
|         trace!("Conn::flush = {:?}", ret); | ||||
|         ret | ||||
|         self.flush().map_err(|err| { | ||||
|             debug!("error writing: {}", err); | ||||
|             err | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     fn close(&mut self) -> Poll<(), Self::SinkError> { | ||||
|         try_ready!(self.poll_complete()); | ||||
|         self.io.io_mut().shutdown() | ||||
|         self.io.io_mut().shutdown().map_err(|err| { | ||||
|             debug!("error closing: {}", err); | ||||
|             err | ||||
|         }) | ||||
|     } | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user