fix(http2): send trailers if Payload includes them

This commit is contained in:
Sean McArthur
2018-06-05 17:27:09 -07:00
parent a096799c1b
commit 3affe2a0af

View File

@@ -30,6 +30,7 @@ where
S: Payload, S: Payload,
{ {
body_tx: SendStream<SendBuf<S::Data>>, body_tx: SendStream<SendBuf<S::Data>>,
data_done: bool,
stream: S, stream: S,
} }
@@ -40,9 +41,23 @@ where
fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> { fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
PipeToSendStream { PipeToSendStream {
body_tx: tx, body_tx: tx,
data_done: false,
stream: stream, stream: stream,
} }
} }
fn on_err(&mut self, err: S::Error) -> ::Error {
let err = ::Error::new_user_body(err);
trace!("send body user stream error: {}", err);
self.body_tx.send_reset(Reason::INTERNAL_ERROR);
err
}
fn send_eos_frame(&mut self) -> ::Result<()> {
trace!("send body eos");
self.body_tx.send_data(SendBuf(None), true)
.map_err(::Error::new_body_write)
}
} }
impl<S> Future for PipeToSendStream<S> impl<S> Future for PipeToSendStream<S>
@@ -54,6 +69,7 @@ where
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop { loop {
if !self.data_done {
// TODO: make use of flow control on SendStream // TODO: make use of flow control on SendStream
// If you're looking at this and thinking of trying to fix this TODO, // If you're looking at this and thinking of trying to fix this TODO,
// you may want to look at: // you may want to look at:
@@ -68,8 +84,8 @@ where
// - else: // - else:
// - try reserve a smallish amount of capacity // - try reserve a smallish amount of capacity
// - call self.body_tx.poll_capacity(), return if NotReady // - call self.body_tx.poll_capacity(), return if NotReady
match self.stream.poll_data() { match try_ready!(self.stream.poll_data().map_err(|e| self.on_err(e))) {
Ok(Async::Ready(Some(chunk))) => { Some(chunk) => {
let is_eos = self.stream.is_end_stream(); let is_eos = self.stream.is_end_stream();
trace!( trace!(
"send body chunk: {} bytes, eos={}", "send body chunk: {} bytes, eos={}",
@@ -85,18 +101,27 @@ where
return Ok(Async::Ready(())) return Ok(Async::Ready(()))
} }
}, },
Ok(Async::Ready(None)) => { None => {
trace!("send body eos"); let is_eos = self.stream.is_end_stream();
self.body_tx.send_data(SendBuf(None), true) if is_eos {
return self.send_eos_frame().map(Async::Ready);
} else {
self.data_done = true;
// loop again to poll_trailers
}
},
}
} else {
match try_ready!(self.stream.poll_trailers().map_err(|e| self.on_err(e))) {
Some(trailers) => {
self.body_tx.send_trailers(trailers)
.map_err(::Error::new_body_write)?; .map_err(::Error::new_body_write)?;
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
}, },
Ok(Async::NotReady) => return Ok(Async::NotReady), None => {
Err(err) => { // There were no trailers, so send an empty DATA frame...
let err = ::Error::new_user_body(err); return self.send_eos_frame().map(Async::Ready);
trace!("send body user stream error: {}", err); },
self.body_tx.send_reset(Reason::INTERNAL_ERROR);
return Err(err);
} }
} }
} }