when receiving a GOAWAY, allow earlier streams to still process (#133)

Once all active streams have finished, send a GOAWAY back and close the
connection.
This commit is contained in:
Sean McArthur
2017-10-05 15:32:13 -07:00
committed by GitHub
parent c4ca8f7def
commit ecd2764f4b
6 changed files with 174 additions and 31 deletions

View File

@@ -20,6 +20,12 @@ where
/// Tracks the connection level state transitions.
state: State,
/// An error to report back once complete.
///
/// This exists separately from State in order to support
/// graceful shutdown.
error: Option<Reason>,
/// Read / write frame values
codec: Codec<T, Prioritized<B::Buf>>,
@@ -41,14 +47,14 @@ enum State {
/// Currently open in a sane state
Open,
/// Waiting to send a GO_AWAY frame
/// Waiting to send a GOAWAY frame
GoAway(frame::GoAway),
/// The codec must be flushed
Flush(Reason),
/// In an errored state
Error(Reason),
/// In a closed state
Closed(Reason),
}
impl<T, P, B> Connection<T, P, B>
@@ -74,6 +80,7 @@ where
});
Connection {
state: State::Open,
error: None,
codec: codec,
ping_pong: PingPong::new(),
settings: Settings::new(),
@@ -118,10 +125,19 @@ where
// This will also handle flushing `self.codec`
try_ready!(self.streams.poll_complete(&mut self.codec));
if self.error.is_some() {
if self.streams.num_active_streams() == 0 {
let id = self.streams.last_processed_id();
let goaway = frame::GoAway::new(id, Reason::NoError);
self.state = State::GoAway(goaway);
continue;
}
}
return Ok(Async::NotReady);
},
// Attempting to read a frame resulted in a connection level
// error. This is handled by setting a GO_AWAY frame followed by
// error. This is handled by setting a GOAWAY frame followed by
// terminating the connection.
Err(Connection(e)) => {
debug!("Connection::poll; err={:?}", e);
@@ -164,24 +180,45 @@ where
// Ensure the codec is ready to accept the frame
try_ready!(self.codec.poll_ready());
// Buffer the GO_AWAY frame
// Buffer the GOAWAY frame
self.codec
.buffer(frame.into())
.ok()
.expect("invalid GO_AWAY frame");
// GO_AWAY sent, transition the connection to an errored state
self.state = State::Flush(frame.reason());
// GOAWAY sent, transition the connection to a closed state
// Determine what error code should be returned to user.
let reason = if let Some(theirs) = self.error.take() {
let ours = frame.reason();
match (ours, theirs) {
// If either side reported an error, return that
// to the user.
(Reason::NoError, err) |
(err, Reason::NoError) => err,
// If both sides reported an error, give their
// error back to th user. We assume our error
// was a consequence of their error, and less
// important.
(_, theirs) => theirs,
}
} else {
frame.reason()
};
self.state = State::Flush(reason);
},
State::Flush(reason) => {
// Flush the codec
try_ready!(self.codec.flush());
// Transition the state to error
self.state = State::Error(reason);
self.state = State::Closed(reason);
},
State::Error(reason) => {
return Err(reason.into());
State::Closed(reason) => {
if let Reason::NoError = reason {
return Ok(Async::Ready(()));
} else {
return Err(reason.into());
}
},
}
}
@@ -215,11 +252,14 @@ where
trace!("recv SETTINGS; frame={:?}", frame);
self.settings.recv_settings(frame);
},
Some(GoAway(_)) => {
// TODO: handle the last_processed_id. Also, should this be
// handled as an error?
// let _ = RecvError::Proto(frame.reason());
return Ok(().into());
Some(GoAway(frame)) => {
trace!("recv GOAWAY; frame={:?}", frame);
// This should prevent starting new streams,
// but should allow continuing to process current streams
// until they are all EOS. Once they are, State should
// transition to GoAway.
self.streams.recv_goaway(&frame);
self.error = Some(frame.reason());
},
Some(Ping(frame)) => {
trace!("recv PING; frame={:?}", frame);

View File

@@ -101,22 +101,14 @@ where
// Transition the state
stream.state.set_reset(reason);
// Clear all pending outbound frames
self.prioritize.clear_queue(stream);
// Reclaim all capacity assigned to the stream and re-assign it to the
// connection
let available = stream.send_flow.available();
stream.send_flow.claim_capacity(available);
self.recv_err(stream);
let frame = frame::Reset::new(stream.id, reason);
trace!("send_reset -- queueing; frame={:?}", frame);
self.prioritize.queue_frame(frame.into(), stream, task);
// Re-assign all capacity to the connection
self.prioritize
.assign_connection_capacity(available, stream);
}
pub fn send_data(
@@ -221,6 +213,19 @@ where
Ok(())
}
pub fn recv_err(&mut self, stream: &mut store::Ptr<B, P>) {
// Clear all pending outbound frames
self.prioritize.clear_queue(stream);
// Reclaim all capacity assigned to the stream and re-assign it to the
// connection
let available = stream.send_flow.available();
stream.send_flow.claim_capacity(available);
// Re-assign all capacity to the connection
self.prioritize
.assign_connection_capacity(available, stream);
}
pub fn apply_remote_settings(
&mut self,
settings: &frame::Settings,

View File

@@ -222,7 +222,6 @@ where
}
}
#[cfg(feature = "unstable")]
impl<B, P> Store<B, P>
where
P: Peer,
@@ -231,6 +230,7 @@ where
self.ids.len()
}
#[cfg(feature = "unstable")]
pub fn num_wired_streams(&self) -> usize {
self.slab.len()
}

View File

@@ -192,6 +192,7 @@ where
.for_each(|stream| {
counts.transition(stream, |_, stream| {
actions.recv.recv_err(err, &mut *stream);
actions.send.recv_err(stream);
Ok::<_, ()>(())
})
})
@@ -202,6 +203,37 @@ where
last_processed_id
}
pub fn recv_goaway(&mut self, frame: &frame::GoAway) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let actions = &mut me.actions;
let counts = &mut me.counts;
let last_stream_id = frame.last_stream_id();
let err = frame.reason().into();
me.store
.for_each(|stream| {
if stream.id > last_stream_id {
counts.transition(stream, |_, stream| {
actions.recv.recv_err(&err, &mut *stream);
actions.send.recv_err(stream);
Ok::<_, ()>(())
})
} else {
Ok::<_, ()>(())
}
})
.unwrap();
actions.conn_error = Some(err);
}
pub fn last_processed_id(&self) -> StreamId {
self.inner.lock().unwrap().actions.recv.last_processed_id()
}
pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), RecvError> {
let id = frame.stream_id();
let mut me = self.inner.lock().unwrap();
@@ -446,7 +478,6 @@ where
}
}
#[cfg(feature = "unstable")]
impl<B, P> Streams<B, P>
where
B: Buf,
@@ -457,6 +488,7 @@ where
me.store.num_active_streams()
}
#[cfg(feature = "unstable")]
pub fn num_wired_streams(&self) -> usize {
let me = self.inner.lock().unwrap();
me.store.num_wired_streams()