feat(server): change http1_half_close option default to disabled

Detecting a read hangup is a useful way to determine that a connection
has closed. It's also possible that a client shuts down its read half
without closing the connection, but this is rarer. Thus, by default,
hyper will now assume a read EOF means the connection has closed.

BREAKING CHANGE: The server's behavior will now by default close
  connections when receiving a read EOF. To allow for clients to close
  the read half, call `http1_half_close(true)` when configuring a
  server.
This commit is contained in:
Sean McArthur
2019-10-18 13:08:06 -07:00
parent 8e7ebd80cd
commit 7e31fd88a8
5 changed files with 67 additions and 45 deletions

View File

@@ -38,7 +38,7 @@ where I: AsyncRead + AsyncWrite + Unpin,
Conn { Conn {
io: Buffered::new(io), io: Buffered::new(io),
state: State { state: State {
allow_half_close: true, allow_half_close: false,
cached_headers: None, cached_headers: None,
error: None, error: None,
keep_alive: KA::Busy, keep_alive: KA::Busy,
@@ -76,8 +76,8 @@ where I: AsyncRead + AsyncWrite + Unpin,
self.state.title_case_headers = true; self.state.title_case_headers = true;
} }
pub(crate) fn set_disable_half_close(&mut self) { pub(crate) fn set_allow_half_close(&mut self) {
self.state.allow_half_close = false; self.state.allow_half_close = true;
} }
pub fn into_inner(self) -> (I, Bytes) { pub fn into_inner(self) -> (I, Bytes) {
@@ -172,7 +172,7 @@ where I: AsyncRead + AsyncWrite + Unpin,
// message should be reported as an error. If not, it is just // message should be reported as an error. If not, it is just
// the connection closing gracefully. // the connection closing gracefully.
let must_error = self.should_error_on_eof(); let must_error = self.should_error_on_eof();
self.state.close_read(); self.close_read();
self.io.consume_leading_lines(); self.io.consume_leading_lines();
let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty(); let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
if was_mid_parse || must_error { if was_mid_parse || must_error {
@@ -185,6 +185,7 @@ where I: AsyncRead + AsyncWrite + Unpin,
} }
} else { } else {
debug!("read eof"); debug!("read eof");
self.close_write();
Poll::Ready(None) Poll::Ready(None)
} }
} }
@@ -204,7 +205,7 @@ where I: AsyncRead + AsyncWrite + Unpin,
None None
}) })
} else if slice.is_empty() { } else if slice.is_empty() {
error!("decode stream unexpectedly ended"); error!("incoming body unexpectedly ended");
// This should be unreachable, since all 3 decoders // This should be unreachable, since all 3 decoders
// either set eof=true or return an Err when reading // either set eof=true or return an Err when reading
// an empty slice... // an empty slice...
@@ -216,7 +217,7 @@ where I: AsyncRead + AsyncWrite + Unpin,
}, },
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
debug!("decode stream error: {}", e); debug!("incoming body decode error: {}", e);
(Reading::Closed, Poll::Ready(Some(Err(e)))) (Reading::Closed, Poll::Ready(Some(Err(e))))
}, },
} }
@@ -294,6 +295,10 @@ where I: AsyncRead + AsyncWrite + Unpin,
return Poll::Pending; return Poll::Pending;
} }
if self.state.is_read_closed() {
return Poll::Ready(Err(crate::Error::new_incomplete()));
}
let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
if num_read == 0 { if num_read == 0 {
@@ -306,6 +311,8 @@ where I: AsyncRead + AsyncWrite + Unpin,
} }
fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> { fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> {
debug_assert!(!self.state.is_read_closed());
let result = ready!(self.io.poll_read_from_io(cx)); let result = ready!(self.io.poll_read_from_io(cx));
Poll::Ready(result.map_err(|e| { Poll::Ready(result.map_err(|e| {
trace!("force_io_read; io error = {:?}", e); trace!("force_io_read; io error = {:?}", e);
@@ -619,8 +626,10 @@ where I: AsyncRead + AsyncWrite + Unpin,
pub fn disable_keep_alive(&mut self) { pub fn disable_keep_alive(&mut self) {
if self.state.is_idle() { if self.state.is_idle() {
self.state.close_read(); trace!("disable_keep_alive; closing idle connection");
self.state.close();
} else { } else {
trace!("disable_keep_alive; in-progress connection");
self.state.disable_keep_alive(); self.state.disable_keep_alive();
} }
} }

View File

@@ -60,7 +60,10 @@ where
} }
pub fn disable_keep_alive(&mut self) { pub fn disable_keep_alive(&mut self) {
self.conn.disable_keep_alive() self.conn.disable_keep_alive();
if self.conn.is_write_closed() {
self.close();
}
} }
pub fn into_inner(self) -> (I, Bytes, D) { pub fn into_inner(self) -> (I, Bytes, D) {
@@ -233,10 +236,17 @@ where
// if here, the dispatcher gave the user the error // if here, the dispatcher gave the user the error
// somewhere else. we still need to shutdown, but // somewhere else. we still need to shutdown, but
// not as a second error. // not as a second error.
self.close();
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
}, },
None => { None => {
// read eof, conn will start to shutdown automatically // read eof, the write side will have been closed too unless
// allow_read_close was set to true, in which case just do
// nothing...
debug_assert!(self.conn.is_read_closed());
if self.conn.is_write_closed() {
self.close();
}
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
} }

View File

@@ -194,7 +194,7 @@ impl Http {
Http { Http {
exec: Exec::Default, exec: Exec::Default,
h1_half_close: true, h1_half_close: false,
h1_writev: true, h1_writev: true,
h2_builder, h2_builder,
mode: ConnectionMode::Fallback, mode: ConnectionMode::Fallback,
@@ -221,12 +221,11 @@ impl<E> Http<E> {
/// Set whether HTTP/1 connections should support half-closures. /// Set whether HTTP/1 connections should support half-closures.
/// ///
/// Clients can chose to shutdown their write-side while waiting /// Clients can chose to shutdown their write-side while waiting
/// for the server to respond. Setting this to `false` will /// for the server to respond. Setting this to `true` will
/// automatically close any connection immediately if `read` /// prevent closing the connection immediately if `read`
/// detects an EOF. /// detects an EOF in the middle of a request.
/// ///
/// Default is `true`. /// Default is `false`.
#[inline]
pub fn http1_half_close(&mut self, val: bool) -> &mut Self { pub fn http1_half_close(&mut self, val: bool) -> &mut Self {
self.h1_half_close = val; self.h1_half_close = val;
self self
@@ -390,8 +389,8 @@ impl<E> Http<E> {
if !self.keep_alive { if !self.keep_alive {
conn.disable_keep_alive(); conn.disable_keep_alive();
} }
if !self.h1_half_close { if self.h1_half_close {
conn.set_disable_half_close(); conn.set_allow_half_close();
} }
if !self.h1_writev { if !self.h1_writev {
conn.set_write_strategy_flatten(); conn.set_write_strategy_flatten();

View File

@@ -252,11 +252,11 @@ impl<I, E> Builder<I, E> {
/// Set whether HTTP/1 connections should support half-closures. /// Set whether HTTP/1 connections should support half-closures.
/// ///
/// Clients can chose to shutdown their write-side while waiting /// Clients can chose to shutdown their write-side while waiting
/// for the server to respond. Setting this to `false` will /// for the server to respond. Setting this to `true` will
/// automatically close any connection immediately if `read` /// prevent closing the connection immediately if `read`
/// detects an EOF. /// detects an EOF in the middle of a request.
/// ///
/// Default is `true`. /// Default is `false`.
pub fn http1_half_close(mut self, val: bool) -> Self { pub fn http1_half_close(mut self, val: bool) -> Self {
self.protocol.http1_half_close(val); self.protocol.http1_half_close(val);
self self

View File

@@ -945,6 +945,7 @@ fn disable_keep_alive_post_request() {
#[test] #[test]
fn empty_parse_eof_does_not_return_error() { fn empty_parse_eof_does_not_return_error() {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
@@ -983,13 +984,13 @@ fn nonempty_parse_eof_returns_error() {
} }
#[test] #[test]
fn socket_half_closed() { fn http1_allow_half_close() {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
thread::spawn(move || { let t1 = thread::spawn(move || {
let mut tcp = connect(&addr); let mut tcp = connect(&addr);
tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap(); tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap();
tcp.shutdown(::std::net::Shutdown::Write).expect("SHDN_WR"); tcp.shutdown(::std::net::Shutdown::Write).expect("SHDN_WR");
@@ -1005,13 +1006,16 @@ fn socket_half_closed() {
.map(Option::unwrap) .map(Option::unwrap)
.map_err(|_| unreachable!()) .map_err(|_| unreachable!())
.and_then(|socket| { .and_then(|socket| {
Http::new().serve_connection(socket, service_fn(|_| { Http::new()
.http1_half_close(true)
.serve_connection(socket, service_fn(|_| {
tokio_timer::delay_for(Duration::from_millis(500)) tokio_timer::delay_for(Duration::from_millis(500))
.map(|_| Ok::<_, hyper::Error>(Response::new(Body::empty()))) .map(|_| Ok::<_, hyper::Error>(Response::new(Body::empty())))
})) }))
}); });
rt.block_on(fut).unwrap(); rt.block_on(fut).unwrap();
t1.join().expect("client thread");
} }
#[test] #[test]
@@ -1852,28 +1856,28 @@ impl tower_service::Service<Request<Body>> for TestService {
Ok(()).into() Ok(()).into()
} }
fn call(&mut self, req: Request<Body>) -> Self::Future { fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let tx1 = self.tx.clone(); let tx = self.tx.clone();
let tx2 = self.tx.clone();
let replies = self.reply.clone(); let replies = self.reply.clone();
req hyper::rt::spawn(async move {
.into_body() while let Some(chunk) = req.body_mut().next().await {
.try_concat() match chunk {
.map_ok(move |chunk| { Ok(chunk) => {
tx1.send(Msg::Chunk(chunk.to_vec())).unwrap(); tx.send(Msg::Chunk(chunk.to_vec())).unwrap();
() },
}) Err(err) => {
.map(move |result| { tx.send(Msg::Error(err)).unwrap();
let msg = match result { return;
Ok(()) => Msg::End, },
Err(e) => Msg::Error(e), }
}; }
tx2.send(msg).unwrap();
}) tx.send(Msg::End).unwrap();
.map(move |_| { });
TestService::build_reply(replies)
}) Box::pin(async move {
.boxed() TestService::build_reply(replies)
})
} }
} }