refactor(http1): notice eof in the try_keep_alive method
This commit is contained in:
@@ -239,7 +239,9 @@ where I: AsyncRead + AsyncWrite + Unpin,
|
|||||||
pub fn poll_read_keep_alive(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
|
pub fn poll_read_keep_alive(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
|
||||||
debug_assert!(!self.can_read_head() && !self.can_read_body());
|
debug_assert!(!self.can_read_head() && !self.can_read_body());
|
||||||
|
|
||||||
if self.is_mid_message() {
|
if self.is_read_closed() {
|
||||||
|
Poll::Pending
|
||||||
|
} else if self.is_mid_message() {
|
||||||
self.mid_message_detect_eof(cx)
|
self.mid_message_detect_eof(cx)
|
||||||
} else {
|
} else {
|
||||||
self.require_empty_read(cx)
|
self.require_empty_read(cx)
|
||||||
@@ -258,7 +260,7 @@ where I: AsyncRead + AsyncWrite + Unpin,
|
|||||||
// This should only be called for Clients wanting to enter the idle
|
// This should only be called for Clients wanting to enter the idle
|
||||||
// state.
|
// state.
|
||||||
fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
|
fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
|
||||||
debug_assert!(!self.can_read_head() && !self.can_read_body());
|
debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
|
||||||
debug_assert!(!self.is_mid_message());
|
debug_assert!(!self.is_mid_message());
|
||||||
debug_assert!(T::is_client());
|
debug_assert!(T::is_client());
|
||||||
|
|
||||||
@@ -288,17 +290,13 @@ where I: AsyncRead + AsyncWrite + Unpin,
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
|
fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
|
||||||
debug_assert!(!self.can_read_head() && !self.can_read_body());
|
debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
|
||||||
debug_assert!(self.is_mid_message());
|
debug_assert!(self.is_mid_message());
|
||||||
|
|
||||||
if self.state.allow_half_close || !self.io.read_buf().is_empty() {
|
if self.state.allow_half_close || !self.io.read_buf().is_empty() {
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.state.is_read_closed() {
|
|
||||||
return Poll::Ready(Err(crate::Error::new_incomplete()));
|
|
||||||
}
|
|
||||||
|
|
||||||
let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
|
let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
|
||||||
|
|
||||||
if num_read == 0 {
|
if num_read == 0 {
|
||||||
@@ -347,7 +345,17 @@ where I: AsyncRead + AsyncWrite + Unpin,
|
|||||||
if !self.io.is_read_blocked() {
|
if !self.io.is_read_blocked() {
|
||||||
if self.io.read_buf().is_empty() {
|
if self.io.read_buf().is_empty() {
|
||||||
match self.io.poll_read_from_io(cx) {
|
match self.io.poll_read_from_io(cx) {
|
||||||
Poll::Ready(Ok(_)) => (),
|
Poll::Ready(Ok(n)) => {
|
||||||
|
if n == 0 {
|
||||||
|
trace!("maybe_notify; read eof");
|
||||||
|
if self.state.is_idle() {
|
||||||
|
self.state.close();
|
||||||
|
} else {
|
||||||
|
self.close_read()
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
trace!("maybe_notify; read_from_io blocked");
|
trace!("maybe_notify; read_from_io blocked");
|
||||||
return
|
return
|
||||||
@@ -355,6 +363,7 @@ where I: AsyncRead + AsyncWrite + Unpin,
|
|||||||
Poll::Ready(Err(e)) => {
|
Poll::Ready(Err(e)) => {
|
||||||
trace!("maybe_notify; read_from_io error: {}", e);
|
trace!("maybe_notify; read_from_io error: {}", e);
|
||||||
self.state.close();
|
self.state.close();
|
||||||
|
self.state.error = Some(crate::Error::new_io(e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -716,6 +725,10 @@ impl fmt::Debug for State {
|
|||||||
builder.field("error", error);
|
builder.field("error", error);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if self.allow_half_close {
|
||||||
|
builder.field("allow_half_close", &true);
|
||||||
|
}
|
||||||
|
|
||||||
// Purposefully leaving off other fields..
|
// Purposefully leaving off other fields..
|
||||||
|
|
||||||
builder.finish()
|
builder.finish()
|
||||||
|
|||||||
@@ -1790,28 +1790,24 @@ mod conn {
|
|||||||
use futures_util::try_future::TryFutureExt;
|
use futures_util::try_future::TryFutureExt;
|
||||||
use futures_util::try_stream::TryStreamExt;
|
use futures_util::try_stream::TryStreamExt;
|
||||||
use tokio::runtime::current_thread::Runtime;
|
use tokio::runtime::current_thread::Runtime;
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
|
||||||
use tokio_net::tcp::TcpStream;
|
use tokio_net::tcp::{TcpListener as TkTcpListener, TcpStream};
|
||||||
|
|
||||||
use hyper::{self, Request, Body, Method};
|
use hyper::{self, Request, Body, Method};
|
||||||
use hyper::client::conn;
|
use hyper::client::conn;
|
||||||
|
|
||||||
use super::{s, tcp_connect, FutureHyperExt};
|
use super::{s, tcp_connect, FutureHyperExt};
|
||||||
|
|
||||||
#[test]
|
#[tokio::test]
|
||||||
fn get() {
|
async fn get() {
|
||||||
let server = TcpListener::bind("127.0.0.1:0").unwrap();
|
let _ = ::pretty_env_logger::try_init();
|
||||||
let addr = server.local_addr().unwrap();
|
let mut listener = TkTcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
let mut rt = Runtime::new().unwrap();
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
let (tx1, rx1) = oneshot::channel();
|
let server = async move {
|
||||||
|
let mut sock = listener.accept().await.unwrap().0;
|
||||||
thread::spawn(move || {
|
|
||||||
let mut sock = server.accept().unwrap().0;
|
|
||||||
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
|
|
||||||
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
|
|
||||||
let mut buf = [0; 4096];
|
let mut buf = [0; 4096];
|
||||||
let n = sock.read(&mut buf).expect("read 1");
|
let n = sock.read(&mut buf).await.expect("read 1");
|
||||||
|
|
||||||
// Notably:
|
// Notably:
|
||||||
// - Just a path, since just a path was set
|
// - Just a path, since just a path was set
|
||||||
@@ -1819,27 +1815,27 @@ mod conn {
|
|||||||
let expected = "GET /a HTTP/1.1\r\n\r\n";
|
let expected = "GET /a HTTP/1.1\r\n\r\n";
|
||||||
assert_eq!(s(&buf[..n]), expected);
|
assert_eq!(s(&buf[..n]), expected);
|
||||||
|
|
||||||
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap();
|
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").await.unwrap();
|
||||||
let _ = tx1.send(());
|
};
|
||||||
});
|
|
||||||
|
|
||||||
let tcp = rt.block_on(tcp_connect(&addr)).unwrap();
|
let client = async move {
|
||||||
|
let tcp = tcp_connect(&addr).await.expect("connect");
|
||||||
|
let (mut client, conn) = conn::handshake(tcp).await.expect("handshake");
|
||||||
|
|
||||||
let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap();
|
hyper::rt::spawn(async move {
|
||||||
|
conn.await.expect("http conn");
|
||||||
|
});
|
||||||
|
|
||||||
rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ()));
|
let req = Request::builder()
|
||||||
|
.uri("/a")
|
||||||
let req = Request::builder()
|
.body(Default::default())
|
||||||
.uri("/a")
|
.unwrap();
|
||||||
.body(Default::default())
|
let mut res = client.send_request(req).await.expect("send_request");
|
||||||
.unwrap();
|
|
||||||
let res = client.send_request(req).and_then(move |res| {
|
|
||||||
assert_eq!(res.status(), hyper::StatusCode::OK);
|
assert_eq!(res.status(), hyper::StatusCode::OK);
|
||||||
res.into_body().try_concat()
|
assert!(res.body_mut().next().await.is_none());
|
||||||
});
|
};
|
||||||
let rx = rx1.expect("thread panicked");
|
|
||||||
let rx = rx.then(|_| tokio_timer::delay_for(Duration::from_millis(200)));
|
future::join(server, client).await;
|
||||||
rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
Reference in New Issue
Block a user