fix(client): always wait on reads for pooled connections
This commit is contained in:
		| @@ -102,16 +102,7 @@ where I: AsyncRead + AsyncWrite, | |||||||
|  |  | ||||||
|     pub fn can_read_head(&self) -> bool { |     pub fn can_read_head(&self) -> bool { | ||||||
|         match self.state.reading { |         match self.state.reading { | ||||||
|             Reading::Init => { |             Reading::Init => true, | ||||||
|                 if T::should_read_first() { |  | ||||||
|                     true |  | ||||||
|                 } else { |  | ||||||
|                     match self.state.writing { |  | ||||||
|                         Writing::Init => false, |  | ||||||
|                         _ => true, |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|             _ => false, |             _ => false, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| @@ -245,19 +236,13 @@ where I: AsyncRead + AsyncWrite, | |||||||
|             Reading::Closed => false, |             Reading::Closed => false, | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         let wants_write = match self.state.writing { |         match self.state.writing { | ||||||
|             Writing::Continue(..) | |             Writing::Continue(..) | | ||||||
|             Writing::Body(..) | |             Writing::Body(..) | | ||||||
|             Writing::Ending(..) => return, |             Writing::Ending(..) => return, | ||||||
|             Writing::Init => true, |             Writing::Init | | ||||||
|             Writing::KeepAlive => false, |             Writing::KeepAlive | | ||||||
|             Writing::Closed => false, |             Writing::Closed => (), | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         // if the client is at Reading::Init and Writing::Init, |  | ||||||
|         // it's not actually looking for a read, but a write. |  | ||||||
|         if wants_write && !T::should_read_first() { |  | ||||||
|             return; |  | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         if !self.io.is_read_blocked() { |         if !self.io.is_read_blocked() { | ||||||
|   | |||||||
| @@ -1,3 +1,5 @@ | |||||||
|  | use std::io; | ||||||
|  |  | ||||||
| use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; | use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; | ||||||
| use futures::sync::{mpsc, oneshot}; | use futures::sync::{mpsc, oneshot}; | ||||||
| use tokio_io::{AsyncRead, AsyncWrite}; | use tokio_io::{AsyncRead, AsyncWrite}; | ||||||
| @@ -64,7 +66,7 @@ where | |||||||
|                         } else { |                         } else { | ||||||
|                             None |                             None | ||||||
|                         }; |                         }; | ||||||
|                         self.dispatch.recv_msg(Ok((head, body))).expect("recv_msg with Ok shouldn't error"); |                         self.dispatch.recv_msg(Ok((head, body)))?; | ||||||
|                     }, |                     }, | ||||||
|                     Ok(Async::Ready(None)) => { |                     Ok(Async::Ready(None)) => { | ||||||
|                         // read eof, conn will start to shutdown automatically |                         // read eof, conn will start to shutdown automatically | ||||||
| @@ -306,10 +308,13 @@ where | |||||||
|     fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Option<Body>)>) -> ::Result<()> { |     fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Option<Body>)>) -> ::Result<()> { | ||||||
|         match msg { |         match msg { | ||||||
|             Ok((msg, body)) => { |             Ok((msg, body)) => { | ||||||
|  |                 if let Some(cb) = self.callback.take() { | ||||||
|                     let res = super::response::from_wire(msg, body); |                     let res = super::response::from_wire(msg, body); | ||||||
|                 let cb = self.callback.take().expect("recv_msg without callback"); |  | ||||||
|                     let _ = cb.send(Ok(res)); |                     let _ = cb.send(Ok(res)); | ||||||
|                     Ok(()) |                     Ok(()) | ||||||
|  |                 } else { | ||||||
|  |                     Err(::Error::Io(io::Error::new(io::ErrorKind::InvalidData, "response received without matching request"))) | ||||||
|  |                 } | ||||||
|             }, |             }, | ||||||
|             Err(err) => { |             Err(err) => { | ||||||
|                 if let Some(cb) = self.callback.take() { |                 if let Some(cb) = self.callback.take() { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user