notify stream send task when receiving EOF (#178)
* notify stream send task when receiving EOF * record a conn_error on eof so client can see it * fix stream id overflow test
This commit is contained in:
@@ -499,6 +499,7 @@ impl Recv {
|
|||||||
|
|
||||||
pub fn recv_eof(&mut self, stream: &mut Stream) {
|
pub fn recv_eof(&mut self, stream: &mut Stream) {
|
||||||
stream.state.recv_eof();
|
stream.state.recv_eof();
|
||||||
|
stream.notify_send();
|
||||||
stream.notify_recv();
|
stream.notify_recv();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -279,6 +279,10 @@ where
|
|||||||
let actions = &mut me.actions;
|
let actions = &mut me.actions;
|
||||||
let counts = &mut me.counts;
|
let counts = &mut me.counts;
|
||||||
|
|
||||||
|
if actions.conn_error.is_none() {
|
||||||
|
actions.conn_error = Some(io::Error::from(io::ErrorKind::BrokenPipe).into());
|
||||||
|
}
|
||||||
|
|
||||||
me.store
|
me.store
|
||||||
.for_each(|stream| {
|
.for_each(|stream| {
|
||||||
counts.transition(stream, |_, stream| {
|
counts.transition(stream, |_, stream| {
|
||||||
|
|||||||
@@ -142,7 +142,12 @@ fn request_stream_id_overflows() {
|
|||||||
.request("GET", "https://example.com/")
|
.request("GET", "https://example.com/")
|
||||||
.eos(),
|
.eos(),
|
||||||
)
|
)
|
||||||
.send_frame(frames::headers(::std::u32::MAX >> 1).response(200))
|
.send_frame(
|
||||||
|
frames::headers(::std::u32::MAX >> 1)
|
||||||
|
.response(200)
|
||||||
|
.eos()
|
||||||
|
)
|
||||||
|
.idle_ms(10)
|
||||||
.close();
|
.close();
|
||||||
|
|
||||||
h2.join(srv).wait().expect("wait");
|
h2.join(srv).wait().expect("wait");
|
||||||
@@ -362,6 +367,56 @@ fn connection_close_notifies_response_future() {
|
|||||||
client.join(srv).wait().expect("wait");
|
client.join(srv).wait().expect("wait");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn connection_close_notifies_client_poll_ready() {
|
||||||
|
let _ = ::env_logger::init();
|
||||||
|
let (io, srv) = mock::new();
|
||||||
|
|
||||||
|
let srv = srv.assert_client_handshake()
|
||||||
|
.unwrap()
|
||||||
|
.recv_settings()
|
||||||
|
.recv_frame(
|
||||||
|
frames::headers(1)
|
||||||
|
.request("GET", "https://http2.akamai.com/")
|
||||||
|
.eos(),
|
||||||
|
)
|
||||||
|
.close();
|
||||||
|
|
||||||
|
let client = Client::handshake(io)
|
||||||
|
.expect("handshake")
|
||||||
|
.and_then(|(mut client, conn)| {
|
||||||
|
let request = Request::builder()
|
||||||
|
.uri("https://http2.akamai.com/")
|
||||||
|
.body(())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let req = client
|
||||||
|
.send_request(request, true)
|
||||||
|
.expect("send_request1")
|
||||||
|
.0
|
||||||
|
.then(|res| {
|
||||||
|
let err = res.expect_err("response");
|
||||||
|
assert_eq!(
|
||||||
|
err.to_string(),
|
||||||
|
"broken pipe"
|
||||||
|
);
|
||||||
|
Ok::<_, ()>(())
|
||||||
|
});
|
||||||
|
|
||||||
|
conn.drive(req)
|
||||||
|
.and_then(move |(_conn, _)| {
|
||||||
|
let err = client.poll_ready().expect_err("poll_ready");
|
||||||
|
assert_eq!(
|
||||||
|
err.to_string(),
|
||||||
|
"broken pipe"
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
client.join(srv).wait().expect("wait");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn sending_request_on_closed_connection() {
|
fn sending_request_on_closed_connection() {
|
||||||
let _ = ::env_logger::init();
|
let _ = ::env_logger::init();
|
||||||
|
|||||||
Reference in New Issue
Block a user