notify stream refs when the connection receives EOF (#176)

This commit is contained in:
Sean McArthur
2017-11-28 13:42:22 -08:00
committed by GitHub
parent 79003d0d45
commit 2be2523162
5 changed files with 76 additions and 1 deletions

View File

@@ -277,8 +277,8 @@ where
// TODO: handle
},
None => {
// TODO: Is this correct?
trace!("codec closed");
self.streams.recv_eof();
return Ok(Async::Ready(()));
},
}

View File

@@ -497,6 +497,11 @@ impl Recv {
stream.notify_recv();
}
pub fn recv_eof(&mut self, stream: &mut Stream) {
stream.state.recv_eof();
stream.notify_recv();
}
fn next_stream_id(&self) -> Result<StreamId, RecvError> {
if let Ok(id) = self.next_stream_id {
Ok(id)

View File

@@ -223,6 +223,16 @@ impl State {
}
}
pub fn recv_eof(&mut self) {
match self.inner {
Closed(..) => {},
s => {
trace!("recv_eof; state={:?}", s);
self.inner = Closed(Some(Cause::Io));
}
}
}
/// Indicates that the local side will not send more data to the local.
pub fn send_close(&mut self) {
match self.inner {

View File

@@ -272,6 +272,23 @@ where
actions.conn_error = Some(err);
}
pub fn recv_eof(&mut self) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let actions = &mut me.actions;
let counts = &mut me.counts;
me.store
.for_each(|stream| {
counts.transition(stream, |_, stream| {
actions.recv.recv_eof(stream);
Ok::<_, ()>(())
})
})
.expect("recv_eof");
}
pub fn last_processed_id(&self) -> StreamId {
self.inner.lock().unwrap().actions.recv.last_processed_id()
}

View File

@@ -319,6 +319,49 @@ fn request_with_connection_headers() {
client.join(srv).wait().expect("wait");
}
#[test]
fn connection_close_notifies_response_future() {
let _ = ::env_logger::init();
let (io, srv) = mock::new();
let srv = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
// don't send any response, just close
.close();
let client = Client::handshake(io)
.expect("handshake")
.and_then(|(mut client, conn)| {
let request = Request::builder()
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = client
.send_request(request, true)
.expect("send_request1")
.0
.then(|res| {
let err = res.expect_err("response");
assert_eq!(
err.to_string(),
"broken pipe"
);
Ok(())
});
conn.expect("conn").join(req)
});
client.join(srv).wait().expect("wait");
}
#[test]
fn sending_request_on_closed_connection() {
let _ = ::env_logger::init();