diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 8ba74bd..7768bec 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -266,6 +266,27 @@ impl Prioritize { Ok(()) } + /// Reclaim all capacity assigned to the stream and re-assign it to the + /// connection + pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr) { + let available = stream.send_flow.available().as_size(); + stream.send_flow.claim_capacity(available); + // Re-assign all capacity to the connection + self.assign_connection_capacity(available, stream); + } + + /// Reclaim just reserved capacity, not buffered capacity, and re-assign + /// it to the connection + pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr) { + // only reclaim requested capacity that isn't already buffered + if stream.requested_send_capacity > stream.buffered_send_data { + let reserved = stream.requested_send_capacity - stream.buffered_send_data; + + stream.send_flow.claim_capacity(reserved); + self.assign_connection_capacity(reserved, stream); + } + } + pub fn assign_connection_capacity(&mut self, inc: WindowSize, store: &mut R) where R: Resolve, diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 218954e..cff192b 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -155,6 +155,7 @@ impl Send { } pub fn schedule_cancel(&mut self, stream: &mut store::Ptr, task: &mut Option) { + trace!("schedule_cancel; {:?}", stream.id); if stream.state.is_closed() { // Stream is already closed, nothing more to do return; @@ -162,7 +163,7 @@ impl Send { stream.state.set_canceled(); - self.reclaim_capacity(stream); + self.prioritize.reclaim_reserved_capacity(stream); self.prioritize.schedule_send(stream, task); } @@ -285,17 +286,7 @@ impl Send { ) { // Clear all pending outbound frames self.prioritize.clear_queue(buffer, stream); - self.reclaim_capacity(stream); - } - - fn reclaim_capacity(&mut self, stream: &mut store::Ptr) { - // Reclaim all capacity assigned to the stream and re-assign it to the - // connection - let available = stream.send_flow.available().as_size(); - stream.send_flow.claim_capacity(available); - // Re-assign all capacity to the connection - self.prioritize - .assign_connection_capacity(available, stream); + self.prioritize.reclaim_all_capacity(stream); } pub fn apply_remote_settings( diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index e37c65b..658e621 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -243,7 +243,7 @@ impl Stream { /// /// In this case, a reset should be sent. pub fn is_canceled_interest(&self) -> bool { - self.ref_count == 0 && !self.state.is_recv_closed() + self.ref_count == 0 && !self.state.is_closed() } pub fn assign_capacity(&mut self, capacity: WindowSize) { diff --git a/tests/server.rs b/tests/server.rs index cf8a87f..16f2028 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -172,7 +172,7 @@ fn recv_connection_header() { } #[test] -fn sends_reset_cancel_when_body_is_dropped() { +fn sends_reset_cancel_when_req_body_is_dropped() { let _ = ::env_logger::init(); let (io, client) = mock::new(); @@ -203,3 +203,61 @@ fn sends_reset_cancel_when_body_is_dropped() { srv.join(client).wait().expect("wait"); } + +#[test] +fn sends_reset_cancel_when_res_body_is_dropped() { + let _ = ::env_logger::init(); + let (io, client) = mock::new(); + + let client = client + .assert_server_handshake() + .unwrap() + .recv_settings() + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos() + ) + .recv_frame(frames::headers(1).response(200)) + .recv_frame(frames::reset(1).cancel()) + .send_frame( + frames::headers(3) + .request("GET", "https://example.com/") + .eos() + ) + .recv_frame(frames::headers(3).response(200)) + .recv_frame(frames::data(3, vec![0; 10])) + .recv_frame(frames::reset(3).cancel()) + .close(); + + let srv = Server::handshake(io).expect("handshake").and_then(|srv| { + srv.into_future().unwrap().and_then(|(reqstream, srv)| { + let (req, mut stream) = reqstream.unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + let rsp = http::Response::builder() + .status(200) + .body(()) + .unwrap(); + stream.send_response(rsp, false).unwrap(); + // SendStream dropped + + srv.into_future().unwrap() + }).and_then(|(reqstream, srv)| { + let (_req, mut stream) = reqstream.unwrap(); + + let rsp = http::Response::builder() + .status(200) + .body(()) + .unwrap(); + let mut tx = stream.send_response(rsp, false).unwrap(); + tx.send_data(vec![0; 10].into(), false).unwrap(); + // no send_data with eos + + srv.into_future().unwrap() + }) + }); + + srv.join(client).wait().expect("wait"); +}