diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 13d1312..7ae9139 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -141,18 +141,8 @@ impl Connection try!(self.streams.recv_data(frame)); } Some(Reset(frame)) => { - unimplemented!(); - /* trace!("recv RST_STREAM; frame={:?}", frame); - try!(self.streams.recv_reset(&frame)); - - let frame = Frame::Reset { - id: frame.stream_id(), - error: frame.reason(), - }; - - return Ok(Some(frame).into()); - */ + try!(self.streams.recv_reset(frame)); } Some(PushPromise(frame)) => { trace!("recv PUSH_PROMISE; frame={:?}", frame); diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 318dbc6..cfb813f 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -197,6 +197,17 @@ impl Recv Ok(()) } + pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) + -> Result<(), ConnectionError> + { + let err = ConnectionError::Proto(frame.reason()); + + // Notify the stream + stream.state.recv_err(&err); + stream.notify_recv(); + Ok(()) + } + pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream) { // Receive an error stream.state.recv_err(err); diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 57954a9..bbfee6d 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -115,10 +115,11 @@ impl Streams pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), ConnectionError> { - let id = frame.stream_id(); let mut me = self.inner.lock().unwrap(); let me = &mut *me; + let id = frame.stream_id(); + let mut stream = match me.store.find_mut(&id) { Some(stream) => stream, None => return Err(ProtocolError.into()), @@ -135,10 +136,26 @@ impl Streams Ok(()) } - pub fn recv_reset(&mut self, _frame: &frame::Reset) + pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), ConnectionError> { - unimplemented!(); + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let id = frame.stream_id(); + + let mut stream = match me.store.find_mut(&id) { + Some(stream) => stream, + // TODO: should this be an error? + None => return Ok(()), + }; + + me.actions.recv.recv_reset(frame, &mut stream)?; + + assert!(stream.state.is_closed()); + me.actions.dec_num_streams(id); + + Ok(()) } pub fn recv_err(&mut self, err: &ConnectionError) {