Wire in trailers (#34)

Add send and receive trailer support.
This commit is contained in:
Carl Lerche
2017-08-25 10:20:47 -07:00
committed by GitHub
parent c0433e8831
commit 11d5f95236
12 changed files with 365 additions and 25 deletions

View File

@@ -43,6 +43,8 @@ impl<B> Prioritize<B>
flow.assign_capacity(config.init_local_window_sz);
trace!("Prioritize::new; flow={:?}", flow);
Prioritize {
pending_send: store::Queue::new(),
pending_capacity: store::Queue::new(),

View File

@@ -1,4 +1,4 @@
use {client, server, frame, ConnectionError};
use {client, server, frame, HeaderMap, ConnectionError};
use proto::*;
use super::*;
@@ -525,19 +525,15 @@ impl<B> Recv<B> where B: Buf {
-> Poll<Option<Bytes>, ConnectionError>
{
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Frame::Data(frame)) => {
Ok(Some(frame.into_payload()).into())
}
Some(frame) => {
match frame {
Frame::Data(frame) => {
Ok(Some(frame.into_payload()).into())
}
frame => {
// Frame is trailer
stream.pending_recv.push_front(&mut self.buffer, frame);
// Frame is trailer
stream.pending_recv.push_front(&mut self.buffer, frame);
// No more data frames
Ok(None.into())
}
}
// No more data frames
Ok(None.into())
}
None => {
if stream.state.is_recv_closed() {
@@ -552,6 +548,32 @@ impl<B> Recv<B> where B: Buf {
}
}
pub fn poll_trailers(&mut self, stream: &mut Stream<B>)
-> Poll<Option<HeaderMap>, ConnectionError>
{
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Frame::Headers(frame)) => {
Ok(Some(frame.into_fields()).into())
}
Some(_) => {
// TODO: This is a user error. `poll_trailers` was called before
// the entire set of data frames have been consumed. What should
// we do?
unimplemented!();
}
None => {
if stream.state.is_recv_closed() {
// There will be no trailer frame
Ok(None.into())
} else {
// Request to get notified once another frame arrives
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
}
}
}
}
fn reset(&mut self, _stream_id: StreamId, _reason: Reason) {
unimplemented!();
}

View File

@@ -133,6 +133,25 @@ impl<B> Send<B> where B: Buf {
self.prioritize.send_data(frame, stream, task)
}
pub fn send_trailers(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B>,
task: &mut Option<Task>)
-> Result<(), ConnectionError>
{
// TODO: Should this logic be moved into state.rs?
if !stream.state.is_send_streaming() {
return Err(UnexpectedFrameType.into());
}
stream.state.send_close()?;
trace!("send_trailers -- queuing; frame={:?}", frame);
self.prioritize.queue_frame(frame.into(), stream, task);
Ok(())
}
pub fn poll_complete<T>(&mut self,
store: &mut Store<B>,
dst: &mut Codec<T, Prioritized<B>>)

View File

@@ -1,4 +1,4 @@
use {client, server};
use {client, server, HeaderMap};
use proto::*;
use super::*;
@@ -335,6 +335,22 @@ impl<B> StreamRef<B>
})
}
pub fn send_trailers<P: Peer>(&mut self, trailers: HeaderMap) -> Result<(), ConnectionError>
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let stream = me.store.resolve(self.key);
// Create the trailers frame
let frame = frame::Headers::trailers(stream.id, trailers);
me.actions.transition::<P, _, _>(stream, |actions, stream| {
// Send the trailers frame
actions.send.send_trailers(frame, stream, &mut actions.task)
})
}
/// Called by the server after the stream is accepted. Given that clients
/// initialize streams by sending HEADERS, the request will always be
/// available.
@@ -403,6 +419,15 @@ impl<B> StreamRef<B>
me.actions.recv.poll_data(&mut stream)
}
pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, ConnectionError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.key);
me.actions.recv.poll_trailers(&mut stream)
}
/// Releases recv capacity back to the peer. This will result in sending
/// WINDOW_UPDATE frames on both the stream and connection.
pub fn release_capacity(&mut self, capacity: WindowSize)