diff --git a/src/lib.rs b/src/lib.rs index a52567d..36a5b42 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,6 +55,17 @@ pub struct Body { // ===== impl Body ===== +impl Body { + pub fn is_empty(&self) -> bool { + // If the recv side is closed and the receive queue is empty, the body is empty. + self.inner.body_is_empty() + } + + pub fn release_capacity(&mut self, sz: usize) -> Result<(), ConnectionError> { + self.inner.release_capacity(sz as proto::WindowSize) + } +} + impl futures::Stream for Body { type Item = Bytes; type Error = ConnectionError; diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 1011304..0134e7c 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -108,4 +108,13 @@ impl Deque { None => None, } } + + pub fn peek_front<'a>(&self, buf: &'a Buffer) -> Option<&'a Frame> { + match self.indices { + Some(idxs) => { + Some(&buf.slab[idxs.head].frame) + } + None => None, + } + } } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 6b63095..ad81030 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -225,6 +225,16 @@ impl Recv where B: Buf { Ok(()) } + pub fn body_is_empty(&self, stream: &store::Ptr) -> bool { + if !stream.state.is_recv_closed() { + return false; + } + + stream.pending_recv.peek_front(&self.buffer) + .map(|frame| !frame.is_data()) + .unwrap_or(true) + } + pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 0f9af08..f470505 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -1,4 +1,5 @@ use {frame, ConnectionError}; +use error::User::InactiveStreamId; use proto::*; use super::*; @@ -108,6 +109,23 @@ impl Send where B: Buf { stream.state.send_close() } + pub fn send_reset(&mut self, reason: Reason, + stream: &mut store::Ptr, + task: &mut Option) + -> Result<(), ConnectionError> + { + if stream.state.is_closed() { + return Err(InactiveStreamId.into()) + } + + stream.state.send_reset(reason)?; + + let frame = frame::Reset::new(stream.id, reason); + self.prioritize.queue_frame(frame.into(), stream, task); + + Ok(()) + } + pub fn send_data(&mut self, frame: frame::Data, stream: &mut store::Ptr, diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 45b4436..eb34c18 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -242,6 +242,19 @@ impl State { } } + /// Indicates that the local side will not send more data to the local. + pub fn send_reset(&mut self, reason: Reason) -> Result<(), ConnectionError> { + match self.inner { + Idle => Err(ProtocolError.into()), + Closed(..) => Ok(()), + _ => { + trace!("send_reset: => Closed"); + self.inner = Closed(Some(Cause::Proto(reason))); + Ok(()) + } + } + } + /// Returns true if a stream with the current state counts against the /// concurrency limit. pub fn is_counted(&self) -> bool { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 9f63196..198df8d 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -334,6 +334,16 @@ impl StreamRef me.actions.recv.take_request(&mut stream) } + pub fn send_reset(&mut self, reason: Reason) -> Result<(), ConnectionError> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let stream = me.store.resolve(self.key); + me.actions.transition::(stream, move |actions, stream| { + actions.send.send_reset(reason, stream, &mut actions.task) + }) + } + pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool) -> Result<(), ConnectionError> { @@ -350,6 +360,15 @@ impl StreamRef }) } + pub fn body_is_empty(&self) -> bool { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let stream = me.store.resolve(self.key); + + me.actions.recv.body_is_empty(&stream) + } + pub fn poll_response(&mut self) -> Poll, ConnectionError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; diff --git a/src/server.rs b/src/server.rs index 9126f35..b386559 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,12 +1,12 @@ -use {frame, ConnectionError, StreamId}; -use Body; +use {frame, Body, ConnectionError, StreamId}; use proto::{self, Connection, WindowSize}; +use error::Reason; use error::Reason::*; use http::{self, Request, Response}; use futures::{self, Future, Sink, Poll, Async, AsyncSink, IntoFuture}; use tokio_io::{AsyncRead, AsyncWrite}; -use bytes::{Bytes, IntoBuf}; +use bytes::{Bytes, IntoBuf, Buf}; use std::fmt; @@ -191,6 +191,10 @@ impl Stream { { unimplemented!(); } + + pub fn send_reset(mut self, reason: Reason) -> Result<(), ConnectionError> { + self.inner.send_reset::(reason) + } } impl Stream {