feat(server): add http1_half_close(bool) option
This option determines whether a read EOF should close the connection automatically. The behavior was to always allow read EOF while waiting to respond, so this option has a default of `true`. Setting this option to `false` will allow Service futures to be canceled as soon as disconnect is noticed. Closes #1716
This commit is contained in:
@@ -38,6 +38,7 @@ where I: AsyncRead + AsyncWrite,
|
|||||||
Conn {
|
Conn {
|
||||||
io: Buffered::new(io),
|
io: Buffered::new(io),
|
||||||
state: State {
|
state: State {
|
||||||
|
allow_half_close: true,
|
||||||
cached_headers: None,
|
cached_headers: None,
|
||||||
error: None,
|
error: None,
|
||||||
keep_alive: KA::Busy,
|
keep_alive: KA::Busy,
|
||||||
@@ -75,6 +76,10 @@ where I: AsyncRead + AsyncWrite,
|
|||||||
self.state.title_case_headers = true;
|
self.state.title_case_headers = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn set_disable_half_close(&mut self) {
|
||||||
|
self.state.allow_half_close = false;
|
||||||
|
}
|
||||||
|
|
||||||
pub fn into_inner(self) -> (I, Bytes) {
|
pub fn into_inner(self) -> (I, Bytes) {
|
||||||
self.io.into_inner()
|
self.io.into_inner()
|
||||||
}
|
}
|
||||||
@@ -228,10 +233,11 @@ where I: AsyncRead + AsyncWrite,
|
|||||||
|
|
||||||
trace!("read_keep_alive; is_mid_message={}", self.is_mid_message());
|
trace!("read_keep_alive; is_mid_message={}", self.is_mid_message());
|
||||||
|
|
||||||
if !self.is_mid_message() {
|
if self.is_mid_message() {
|
||||||
self.require_empty_read().map_err(::Error::new_io)?;
|
self.mid_message_detect_eof().map_err(::Error::new_io)
|
||||||
|
} else {
|
||||||
|
self.require_empty_read().map_err(::Error::new_io)
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_mid_message(&self) -> bool {
|
fn is_mid_message(&self) -> bool {
|
||||||
@@ -252,7 +258,7 @@ where I: AsyncRead + AsyncWrite,
|
|||||||
// This should only be called for Clients wanting to enter the idle
|
// This should only be called for Clients wanting to enter the idle
|
||||||
// state.
|
// state.
|
||||||
fn require_empty_read(&mut self) -> io::Result<()> {
|
fn require_empty_read(&mut self) -> io::Result<()> {
|
||||||
assert!(!self.can_read_head() && !self.can_read_body());
|
debug_assert!(!self.can_read_head() && !self.can_read_body());
|
||||||
|
|
||||||
if !self.io.read_buf().is_empty() {
|
if !self.io.read_buf().is_empty() {
|
||||||
debug!("received an unexpected {} bytes", self.io.read_buf().len());
|
debug!("received an unexpected {} bytes", self.io.read_buf().len());
|
||||||
@@ -279,11 +285,21 @@ where I: AsyncRead + AsyncWrite,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn mid_message_detect_eof(&mut self) -> io::Result<()> {
|
||||||
|
debug_assert!(!self.can_read_head() && !self.can_read_body());
|
||||||
|
|
||||||
|
if self.state.allow_half_close || !self.io.read_buf().is_empty() {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
self.try_io_read().map(|_| ())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn try_io_read(&mut self) -> Poll<usize, io::Error> {
|
fn try_io_read(&mut self) -> Poll<usize, io::Error> {
|
||||||
match self.io.read_from_io() {
|
match self.io.read_from_io() {
|
||||||
Ok(Async::Ready(0)) => {
|
Ok(Async::Ready(0)) => {
|
||||||
trace!("try_io_read; found EOF on connection: {:?}", self.state);
|
trace!("try_io_read; found EOF on connection: {:?}", self.state);
|
||||||
let must_error = self.should_error_on_eof();
|
let must_error = !self.state.is_idle();
|
||||||
let ret = if must_error {
|
let ret = if must_error {
|
||||||
let desc = if self.is_mid_message() {
|
let desc = if self.is_mid_message() {
|
||||||
"unexpected EOF waiting for response"
|
"unexpected EOF waiting for response"
|
||||||
@@ -655,6 +671,7 @@ impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct State {
|
struct State {
|
||||||
|
allow_half_close: bool,
|
||||||
/// Re-usable HeaderMap to reduce allocating new ones.
|
/// Re-usable HeaderMap to reduce allocating new ones.
|
||||||
cached_headers: Option<HeaderMap>,
|
cached_headers: Option<HeaderMap>,
|
||||||
/// If an error occurs when there wasn't a direct way to return it
|
/// If an error occurs when there wasn't a direct way to return it
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ pub(super) use self::upgrades::UpgradeableConnection;
|
|||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Http<E = Exec> {
|
pub struct Http<E = Exec> {
|
||||||
exec: E,
|
exec: E,
|
||||||
|
h1_half_close: bool,
|
||||||
h1_writev: bool,
|
h1_writev: bool,
|
||||||
mode: ConnectionMode,
|
mode: ConnectionMode,
|
||||||
keep_alive: bool,
|
keep_alive: bool,
|
||||||
@@ -163,6 +164,7 @@ impl Http {
|
|||||||
pub fn new() -> Http {
|
pub fn new() -> Http {
|
||||||
Http {
|
Http {
|
||||||
exec: Exec::Default,
|
exec: Exec::Default,
|
||||||
|
h1_half_close: true,
|
||||||
h1_writev: true,
|
h1_writev: true,
|
||||||
mode: ConnectionMode::Fallback,
|
mode: ConnectionMode::Fallback,
|
||||||
keep_alive: true,
|
keep_alive: true,
|
||||||
@@ -195,6 +197,20 @@ impl<E> Http<E> {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set whether HTTP/1 connections should support half-closures.
|
||||||
|
///
|
||||||
|
/// Clients can chose to shutdown their write-side while waiting
|
||||||
|
/// for the server to respond. Setting this to `false` will
|
||||||
|
/// automatically close any connection immediately if `read`
|
||||||
|
/// detects an EOF.
|
||||||
|
///
|
||||||
|
/// Default is `true`.
|
||||||
|
#[inline]
|
||||||
|
pub fn http1_half_close(&mut self, val: bool) -> &mut Self {
|
||||||
|
self.h1_half_close = val;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Set whether HTTP/1 connections should try to use vectored writes,
|
/// Set whether HTTP/1 connections should try to use vectored writes,
|
||||||
/// or always flatten into a single buffer.
|
/// or always flatten into a single buffer.
|
||||||
///
|
///
|
||||||
@@ -261,6 +277,7 @@ impl<E> Http<E> {
|
|||||||
pub fn with_executor<E2>(self, exec: E2) -> Http<E2> {
|
pub fn with_executor<E2>(self, exec: E2) -> Http<E2> {
|
||||||
Http {
|
Http {
|
||||||
exec,
|
exec,
|
||||||
|
h1_half_close: self.h1_half_close,
|
||||||
h1_writev: self.h1_writev,
|
h1_writev: self.h1_writev,
|
||||||
mode: self.mode,
|
mode: self.mode,
|
||||||
keep_alive: self.keep_alive,
|
keep_alive: self.keep_alive,
|
||||||
@@ -319,6 +336,9 @@ impl<E> Http<E> {
|
|||||||
if !self.keep_alive {
|
if !self.keep_alive {
|
||||||
conn.disable_keep_alive();
|
conn.disable_keep_alive();
|
||||||
}
|
}
|
||||||
|
if !self.h1_half_close {
|
||||||
|
conn.set_disable_half_close();
|
||||||
|
}
|
||||||
if !self.h1_writev {
|
if !self.h1_writev {
|
||||||
conn.set_write_strategy_flatten();
|
conn.set_write_strategy_flatten();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -247,6 +247,20 @@ impl<I, E> Builder<I, E> {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Set whether HTTP/1 connections should support half-closures.
|
||||||
|
///
|
||||||
|
/// Clients can chose to shutdown their write-side while waiting
|
||||||
|
/// for the server to respond. Setting this to `false` will
|
||||||
|
/// automatically close any connection immediately if `read`
|
||||||
|
/// detects an EOF.
|
||||||
|
///
|
||||||
|
/// Default is `true`.
|
||||||
|
pub fn http1_half_close(mut self, val: bool) -> Self {
|
||||||
|
self.protocol.http1_half_close(val);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Sets whether HTTP/1 is required.
|
/// Sets whether HTTP/1 is required.
|
||||||
///
|
///
|
||||||
/// Default is `false`.
|
/// Default is `false`.
|
||||||
|
|||||||
@@ -991,6 +991,71 @@ fn nonempty_parse_eof_returns_error() {
|
|||||||
rt.block_on(fut).expect_err("partial parse eof is error");
|
rt.block_on(fut).expect_err("partial parse eof is error");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn socket_half_closed() {
|
||||||
|
let _ = pretty_env_logger::try_init();
|
||||||
|
let mut rt = Runtime::new().unwrap();
|
||||||
|
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
let mut tcp = connect(&addr);
|
||||||
|
tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap();
|
||||||
|
tcp.shutdown(::std::net::Shutdown::Write).expect("SHDN_WR");
|
||||||
|
|
||||||
|
let mut buf = [0; 256];
|
||||||
|
tcp.read(&mut buf).unwrap();
|
||||||
|
let expected = "HTTP/1.1 200 OK\r\n";
|
||||||
|
assert_eq!(s(&buf[..expected.len()]), expected);
|
||||||
|
});
|
||||||
|
|
||||||
|
let fut = listener.incoming()
|
||||||
|
.into_future()
|
||||||
|
.map_err(|_| unreachable!())
|
||||||
|
.and_then(|(item, _incoming)| {
|
||||||
|
let socket = item.unwrap();
|
||||||
|
Http::new()
|
||||||
|
.serve_connection(socket, service_fn(|_| {
|
||||||
|
Delay::new(Duration::from_millis(500))
|
||||||
|
.map(|_| {
|
||||||
|
Response::new(Body::empty())
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
});
|
||||||
|
|
||||||
|
rt.block_on(fut).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn disconnect_after_reading_request_before_responding() {
|
||||||
|
let _ = pretty_env_logger::try_init();
|
||||||
|
let mut rt = Runtime::new().unwrap();
|
||||||
|
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
let mut tcp = connect(&addr);
|
||||||
|
tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
let fut = listener.incoming()
|
||||||
|
.into_future()
|
||||||
|
.map_err(|_| unreachable!())
|
||||||
|
.and_then(|(item, _incoming)| {
|
||||||
|
let socket = item.unwrap();
|
||||||
|
Http::new()
|
||||||
|
.http1_half_close(false)
|
||||||
|
.serve_connection(socket, service_fn(|_| {
|
||||||
|
Delay::new(Duration::from_secs(2))
|
||||||
|
.map(|_| -> Response<Body> {
|
||||||
|
panic!("response future should have been dropped");
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
});
|
||||||
|
|
||||||
|
rt.block_on(fut).expect_err("socket disconnected");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn returning_1xx_response_is_error() {
|
fn returning_1xx_response_is_error() {
|
||||||
let mut rt = Runtime::new().unwrap();
|
let mut rt = Runtime::new().unwrap();
|
||||||
|
|||||||
Reference in New Issue
Block a user