From f121f747acb333ca255daab7fcda087c59356559 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 22 Jul 2017 21:16:53 +0000 Subject: [PATCH] tests pass --- Cargo.toml | 1 - src/proto/connection.rs | 1 + src/proto/flow_control.rs | 8 +++---- src/proto/framed_write.rs | 3 ++- src/proto/state.rs | 39 ++++++++++------------------------ src/proto/stream_recv_close.rs | 8 +++---- src/proto/stream_recv_open.rs | 12 ++++++----- src/proto/stream_send_close.rs | 8 +++---- src/proto/stream_send_open.rs | 27 +++++++++++++++-------- src/proto/stream_store.rs | 20 ++++++++--------- tests/client_request.rs | 26 +++++++++++------------ 11 files changed, 73 insertions(+), 80 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b5a7d54..91d1565 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ ordermap = "0.2.10" [dev-dependencies] mock-io = { git = "https://github.com/carllerche/mock-io" } -test_futures = "0.0.1" # Fuzzing quickcheck = "0.4.1" diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 10ae35f..9b88e03 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -72,6 +72,7 @@ impl Connection end_of_stream: bool) -> sink::Send { + trace!("send_data: id={:?}", id); self.send(Frame::Data { id, data, diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 2ac8cc0..d05bed7 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -407,12 +407,12 @@ impl ControlStreams for FlowControl { self.inner.send_flow_controller(id) } - fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.check_can_send_data(id) + fn can_send_data(&mut self, id: StreamId) -> bool { + self.inner.can_send_data(id) } - fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.check_can_recv_data(id) + fn can_recv_data(&mut self, id: StreamId) -> bool { + self.inner.can_recv_data(id) } } diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index 6a1ee6f..ca9254e 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -155,7 +155,7 @@ impl Sink for FramedWrite } fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - trace!("FramedWrite::poll_complete"); + trace!("poll_complete"); // TODO: implement match self.next { @@ -165,6 +165,7 @@ impl Sink for FramedWrite // As long as there is data to write, try to write it! while !self.is_empty() { + trace!("writing {}", self.buf.remaining()); try_ready!(self.inner.write_buf(&mut self.buf)); } diff --git a/src/proto/state.rs b/src/proto/state.rs index 1817f25..a129b46 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -1,6 +1,5 @@ use ConnectionError; use error::Reason::*; -use error::User::*; use proto::{FlowControlState, WindowSize}; /// Represents the state of an H2 stream @@ -156,40 +155,24 @@ impl StreamState { Ok(true) } - pub fn check_can_send_data(&self) -> Result<(), ConnectionError> { + pub fn can_send_data(&self) -> bool { use self::StreamState::*; match self { - &Open { ref remote, .. } => { - try!(remote.check_streaming(UnexpectedFrameType.into())); - Ok(()) - } + &Idle | &Closed | &HalfClosedRemote(..) => false, - &HalfClosedLocal(ref remote) => { - try!(remote.check_streaming(UnexpectedFrameType.into())); - Ok(()) - } - - &Idle | &Closed | &HalfClosedRemote(..) => { - Err(UnexpectedFrameType.into()) - } + &Open { ref remote, .. } | + &HalfClosedLocal(ref remote) => remote.is_streaming(), } } - pub fn check_can_recv_data(&self) -> Result<(), ConnectionError> { + pub fn can_recv_data(&self) -> bool { use self::StreamState::*; match self { - &Open { ref local, .. } => { - try!(local.check_streaming(ProtocolError.into())); - Ok(()) - } + &Idle | &Closed | &HalfClosedLocal(..) => false, + &Open { ref local, .. } | &HalfClosedRemote(ref local) => { - try!(local.check_streaming(ProtocolError.into())); - Ok(()) - } - - &Idle | &Closed | &HalfClosedLocal(..) => { - Err(ProtocolError.into()) + local.is_streaming() } } } @@ -288,11 +271,11 @@ impl PeerState { } #[inline] - fn check_streaming(&self, err: ConnectionError) -> Result<(), ConnectionError> { + fn is_streaming(&self) -> bool { use self::PeerState::*; match self { - &Streaming(..) => Ok(()), - _ => Err(err), + &Streaming(..) => true, + _ => false, } } diff --git a/src/proto/stream_recv_close.rs b/src/proto/stream_recv_close.rs index 6db2bce..059c3d4 100644 --- a/src/proto/stream_recv_close.rs +++ b/src/proto/stream_recv_close.rs @@ -157,12 +157,12 @@ impl ControlStreams for StreamRecvClose { self.inner.send_flow_controller(id) } - fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.check_can_send_data(id) + fn can_send_data(&mut self, id: StreamId) -> bool { + self.inner.can_send_data(id) } - fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.check_can_recv_data(id) + fn can_recv_data(&mut self, id: StreamId) -> bool { + self.inner.can_recv_data(id) } } diff --git a/src/proto/stream_recv_open.rs b/src/proto/stream_recv_open.rs index fed4020..817d710 100644 --- a/src/proto/stream_recv_open.rs +++ b/src/proto/stream_recv_open.rs @@ -170,7 +170,9 @@ impl Stream for StreamRecvOpen if let &Data(..) = &frame { // Ensures we've already received headers for this stream. - self.inner.check_can_recv_data(id)?; + if !self.inner.can_recv_data(id) { + return Err(ProtocolError.into()); + } } // If the frame ends the stream, it will be handled in @@ -400,12 +402,12 @@ impl ControlStreams for StreamRecvOpen { self.inner.send_flow_controller(id) } - fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.check_can_send_data(id) + fn can_send_data(&mut self, id: StreamId) -> bool { + self.inner.can_send_data(id) } - fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.check_can_recv_data(id) + fn can_recv_data(&mut self, id: StreamId) -> bool { + self.inner.can_recv_data(id) } } diff --git a/src/proto/stream_send_close.rs b/src/proto/stream_send_close.rs index 00acea6..53cf25c 100644 --- a/src/proto/stream_send_close.rs +++ b/src/proto/stream_send_close.rs @@ -162,12 +162,12 @@ impl ControlStreams for StreamSendClose { self.inner.send_flow_controller(id) } - fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.check_can_send_data(id) + fn can_send_data(&mut self, id: StreamId) -> bool { + self.inner.can_send_data(id) } - fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.check_can_recv_data(id) + fn can_recv_data(&mut self, id: StreamId) -> bool { + self.inner.can_recv_data(id) } } diff --git a/src/proto/stream_send_open.rs b/src/proto/stream_send_open.rs index f59e017..2dfa6f5 100644 --- a/src/proto/stream_send_open.rs +++ b/src/proto/stream_send_open.rs @@ -1,5 +1,5 @@ use ConnectionError; -use error::User::{InvalidStreamId, StreamReset, Rejected}; +use error::User::{InactiveStreamId, InvalidStreamId, StreamReset, Rejected, UnexpectedFrameType}; use frame::{Frame, SettingSet}; use proto::*; @@ -103,7 +103,11 @@ impl Sink for StreamSendOpen } if T::local_valid_id(id) { - if !self.inner.is_local_active(id) { + if self.inner.is_local_active(id) { + if !self.inner.can_send_data(id) { + return Err(InactiveStreamId.into()); + } + } else { if !T::local_can_open() { return Err(InvalidStreamId.into()); } @@ -114,8 +118,11 @@ impl Sink for StreamSendOpen } } - trace!("creating new local stream"); - self.inner.local_open(id, self.initial_window_size)?; + if let &Frame::Headers(..) = &frame { + self.inner.local_open(id, self.initial_window_size)?; + } else { + return Err(InactiveStreamId.into()); + } } } else { // If the frame is part of a remote stream, it MUST already exist. @@ -130,7 +137,9 @@ impl Sink for StreamSendOpen if let &Frame::Data(..) = &frame { // Ensures we've already sent headers for this stream. - self.inner.check_can_send_data(id)?; + if !self.inner.can_send_data(id) { + return Err(InactiveStreamId.into()); + } } trace!("sending frame..."); @@ -230,12 +239,12 @@ impl ControlStreams for StreamSendOpen { self.inner.send_flow_controller(id) } - fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.check_can_send_data(id) + fn can_send_data(&mut self, id: StreamId) -> bool { + self.inner.can_send_data(id) } - fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.check_can_recv_data(id) + fn can_recv_data(&mut self, id: StreamId) -> bool { + self.inner.can_recv_data(id) } } diff --git a/src/proto/stream_store.rs b/src/proto/stream_store.rs index 078e00a..597d0ea 100644 --- a/src/proto/stream_store.rs +++ b/src/proto/stream_store.rs @@ -76,8 +76,8 @@ pub trait ControlStreams { fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32); fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32); - fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError>; - fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError>; + fn can_send_data(&mut self, id: StreamId) -> bool; + fn can_recv_data(&mut self, id: StreamId) -> bool; } /// Holds the underlying stream state to be accessed by upper layers. @@ -366,18 +366,18 @@ impl ControlStreams for StreamStore { } } - fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { - if let Some(s) = self.get_active(id) { - return s.check_can_send_data(); + fn can_send_data(&mut self, id: StreamId) -> bool { + match self.get_active(id) { + Some(s) => s.can_send_data(), + None => false, } - Err(ProtocolError.into()) } - fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { - if let Some(s) = self.get_active(id) { - return s.check_can_recv_data(); + fn can_recv_data(&mut self, id: StreamId) -> bool { + match self.get_active(id) { + Some(s) => s.can_recv_data(), + None => false, } - Err(ProtocolError.into()) } } diff --git a/tests/client_request.rs b/tests/client_request.rs index 5dbe4b5..e9e97aa 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -6,7 +6,6 @@ extern crate futures; #[macro_use] extern crate log; extern crate mock_io; -extern crate test_futures; // scoped so `cargo test client_request` dtrt. mod client_request { @@ -16,7 +15,6 @@ mod client_request { use futures::*; use bytes::Bytes; use mock_io; - use test_futures::*; // TODO: move into another file macro_rules! assert_user_err { @@ -50,12 +48,12 @@ mod client_request { .write(SETTINGS_ACK) .build(); - let mut h2 = client::handshake(mock) + let h2 = client::handshake(mock) .wait().unwrap(); trace!("hands have been shook"); // At this point, the connection should be closed - sassert_done(&mut h2); + assert!(Stream::wait(h2).next().is_none()); } #[test] @@ -87,7 +85,7 @@ mod client_request { // Get the response trace!("getting response"); - let (resp, mut h2) = h2.into_future().wait().unwrap(); + let (resp, h2) = h2.into_future().wait().unwrap(); match resp.unwrap() { Frame::Headers { headers, .. } => { @@ -98,7 +96,7 @@ mod client_request { // No more frames trace!("ensure no more responses"); - sassert_done(&mut h2); + assert!(Stream::wait(h2).next().is_none());; } #[test] @@ -151,7 +149,7 @@ mod client_request { } // Get the response body - let (data, mut h2) = h2.into_future().wait().unwrap(); + let (data, h2) = h2.into_future().wait().unwrap(); match data.unwrap() { Frame::Data { id, data, end_of_stream, .. } => { @@ -162,7 +160,7 @@ mod client_request { _ => panic!("unexpected frame"), } - sassert_done(&mut h2); + assert!(Stream::wait(h2).next().is_none());; } #[test] @@ -215,7 +213,7 @@ mod client_request { } // Get the response body - let (data, mut h2) = h2.into_future().wait().unwrap(); + let (data, h2) = h2.into_future().wait().unwrap(); match data.unwrap() { Frame::Data { id, data, end_of_stream, .. } => { @@ -226,7 +224,7 @@ mod client_request { _ => panic!("unexpected frame"), } - sassert_done(&mut h2); + assert!(Stream::wait(h2).next().is_none());; } #[test] @@ -295,7 +293,7 @@ mod client_request { } // Get the response body - let (data, mut h2) = h2.into_future().wait().expect("into future"); + let (data, h2) = h2.into_future().wait().expect("into future"); match data.expect("response data") { Frame::Data { id, data, end_of_stream, .. } => { @@ -306,7 +304,7 @@ mod client_request { _ => panic!("unexpected frame"), } - sassert_done(&mut h2); + assert!(Stream::wait(h2).next().is_none());; } #[test] @@ -352,7 +350,7 @@ mod client_request { request.uri = "https://http2.akamai.com/".parse().unwrap(); let err = h2.send_request(1.into(), request, true).wait().unwrap_err(); - assert_user_err!(err, UnexpectedFrameType); + assert_user_err!(err, InactiveStreamId); } #[test] @@ -399,7 +397,7 @@ mod client_request { // Send the data let err = h2.send_data(id, body.into(), true).wait().unwrap_err(); - assert_user_err!(err, UnexpectedFrameType); + assert_user_err!(err, InactiveStreamId); } #[test]