implement h2::server::Stream::send_reset(Reason) and Body::is_empty() (#22)

This commit is contained in:
Oliver Gould
2017-08-23 12:48:00 -07:00
committed by Carl Lerche
parent e8f757457b
commit f839443ece
7 changed files with 87 additions and 3 deletions

View File

@@ -55,6 +55,17 @@ pub struct Body<B: IntoBuf> {
// ===== impl Body =====
impl<B: IntoBuf> Body<B> {
pub fn is_empty(&self) -> bool {
// If the recv side is closed and the receive queue is empty, the body is empty.
self.inner.body_is_empty()
}
pub fn release_capacity(&mut self, sz: usize) -> Result<(), ConnectionError> {
self.inner.release_capacity(sz as proto::WindowSize)
}
}
impl<B: IntoBuf> futures::Stream for Body<B> {
type Item = Bytes;
type Error = ConnectionError;

View File

@@ -108,4 +108,13 @@ impl<B> Deque<B> {
None => None,
}
}
pub fn peek_front<'a>(&self, buf: &'a Buffer<B>) -> Option<&'a Frame<B>> {
match self.indices {
Some(idxs) => {
Some(&buf.slab[idxs.head].frame)
}
None => None,
}
}
}

View File

@@ -225,6 +225,16 @@ impl<B> Recv<B> where B: Buf {
Ok(())
}
pub fn body_is_empty(&self, stream: &store::Ptr<B>) -> bool {
if !stream.state.is_recv_closed() {
return false;
}
stream.pending_recv.peek_front(&self.buffer)
.map(|frame| !frame.is_data())
.unwrap_or(true)
}
pub fn recv_data(&mut self,
frame: frame::Data,
stream: &mut store::Ptr<B>)

View File

@@ -1,4 +1,5 @@
use {frame, ConnectionError};
use error::User::InactiveStreamId;
use proto::*;
use super::*;
@@ -108,6 +109,23 @@ impl<B> Send<B> where B: Buf {
stream.state.send_close()
}
pub fn send_reset(&mut self, reason: Reason,
stream: &mut store::Ptr<B>,
task: &mut Option<Task>)
-> Result<(), ConnectionError>
{
if stream.state.is_closed() {
return Err(InactiveStreamId.into())
}
stream.state.send_reset(reason)?;
let frame = frame::Reset::new(stream.id, reason);
self.prioritize.queue_frame(frame.into(), stream, task);
Ok(())
}
pub fn send_data(&mut self,
frame: frame::Data<B>,
stream: &mut store::Ptr<B>,

View File

@@ -242,6 +242,19 @@ impl State {
}
}
/// Indicates that the local side will not send more data to the local.
pub fn send_reset(&mut self, reason: Reason) -> Result<(), ConnectionError> {
match self.inner {
Idle => Err(ProtocolError.into()),
Closed(..) => Ok(()),
_ => {
trace!("send_reset: => Closed");
self.inner = Closed(Some(Cause::Proto(reason)));
Ok(())
}
}
}
/// Returns true if a stream with the current state counts against the
/// concurrency limit.
pub fn is_counted(&self) -> bool {

View File

@@ -334,6 +334,16 @@ impl<B> StreamRef<B>
me.actions.recv.take_request(&mut stream)
}
pub fn send_reset<P: Peer>(&mut self, reason: Reason) -> Result<(), ConnectionError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let stream = me.store.resolve(self.key);
me.actions.transition::<P, _, _>(stream, move |actions, stream| {
actions.send.send_reset(reason, stream, &mut actions.task)
})
}
pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool)
-> Result<(), ConnectionError>
{
@@ -350,6 +360,15 @@ impl<B> StreamRef<B>
})
}
pub fn body_is_empty(&self) -> bool {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let stream = me.store.resolve(self.key);
me.actions.recv.body_is_empty(&stream)
}
pub fn poll_response(&mut self) -> Poll<Response<()>, ConnectionError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

View File

@@ -1,12 +1,12 @@
use {frame, ConnectionError, StreamId};
use Body;
use {frame, Body, ConnectionError, StreamId};
use proto::{self, Connection, WindowSize};
use error::Reason;
use error::Reason::*;
use http::{self, Request, Response};
use futures::{self, Future, Sink, Poll, Async, AsyncSink, IntoFuture};
use tokio_io::{AsyncRead, AsyncWrite};
use bytes::{Bytes, IntoBuf};
use bytes::{Bytes, IntoBuf, Buf};
use std::fmt;
@@ -191,6 +191,10 @@ impl<B: IntoBuf> Stream<B> {
{
unimplemented!();
}
pub fn send_reset(mut self, reason: Reason) -> Result<(), ConnectionError> {
self.inner.send_reset::<Peer>(reason)
}
}
impl Stream<Bytes> {