diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index aa9745e..be44d6e 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -499,6 +499,7 @@ impl Recv { pub fn recv_eof(&mut self, stream: &mut Stream) { stream.state.recv_eof(); + stream.notify_send(); stream.notify_recv(); } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 3dd449d..38e2d82 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -279,6 +279,10 @@ where let actions = &mut me.actions; let counts = &mut me.counts; + if actions.conn_error.is_none() { + actions.conn_error = Some(io::Error::from(io::ErrorKind::BrokenPipe).into()); + } + me.store .for_each(|stream| { counts.transition(stream, |_, stream| { diff --git a/tests/client_request.rs b/tests/client_request.rs index f2cb613..b8ae4d7 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -142,7 +142,12 @@ fn request_stream_id_overflows() { .request("GET", "https://example.com/") .eos(), ) - .send_frame(frames::headers(::std::u32::MAX >> 1).response(200)) + .send_frame( + frames::headers(::std::u32::MAX >> 1) + .response(200) + .eos() + ) + .idle_ms(10) .close(); h2.join(srv).wait().expect("wait"); @@ -362,6 +367,56 @@ fn connection_close_notifies_response_future() { client.join(srv).wait().expect("wait"); } +#[test] +fn connection_close_notifies_client_poll_ready() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .close(); + + let client = Client::handshake(io) + .expect("handshake") + .and_then(|(mut client, conn)| { + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + + let req = client + .send_request(request, true) + .expect("send_request1") + .0 + .then(|res| { + let err = res.expect_err("response"); + assert_eq!( + err.to_string(), + "broken pipe" + ); + Ok::<_, ()>(()) + }); + + conn.drive(req) + .and_then(move |(_conn, _)| { + let err = client.poll_ready().expect_err("poll_ready"); + assert_eq!( + err.to_string(), + "broken pipe" + ); + Ok(()) + }) + }); + + client.join(srv).wait().expect("wait"); +} + #[test] fn sending_request_on_closed_connection() { let _ = ::env_logger::init();