tests pass

This commit is contained in:
Oliver Gould
2017-07-22 21:16:53 +00:00
parent 7951def04d
commit f121f747ac
11 changed files with 73 additions and 80 deletions

View File

@@ -16,7 +16,6 @@ ordermap = "0.2.10"
[dev-dependencies] [dev-dependencies]
mock-io = { git = "https://github.com/carllerche/mock-io" } mock-io = { git = "https://github.com/carllerche/mock-io" }
test_futures = "0.0.1"
# Fuzzing # Fuzzing
quickcheck = "0.4.1" quickcheck = "0.4.1"

View File

@@ -72,6 +72,7 @@ impl<T, P, B> Connection<T, P, B>
end_of_stream: bool) end_of_stream: bool)
-> sink::Send<Self> -> sink::Send<Self>
{ {
trace!("send_data: id={:?}", id);
self.send(Frame::Data { self.send(Frame::Data {
id, id,
data, data,

View File

@@ -407,12 +407,12 @@ impl<T: ControlStreams> ControlStreams for FlowControl<T> {
self.inner.send_flow_controller(id) self.inner.send_flow_controller(id)
} }
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { fn can_send_data(&mut self, id: StreamId) -> bool {
self.inner.check_can_send_data(id) self.inner.can_send_data(id)
} }
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { fn can_recv_data(&mut self, id: StreamId) -> bool {
self.inner.check_can_recv_data(id) self.inner.can_recv_data(id)
} }
} }

View File

@@ -155,7 +155,7 @@ impl<T, B> Sink for FramedWrite<T, B>
} }
fn poll_complete(&mut self) -> Poll<(), ConnectionError> { fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
trace!("FramedWrite::poll_complete"); trace!("poll_complete");
// TODO: implement // TODO: implement
match self.next { match self.next {
@@ -165,6 +165,7 @@ impl<T, B> Sink for FramedWrite<T, B>
// As long as there is data to write, try to write it! // As long as there is data to write, try to write it!
while !self.is_empty() { while !self.is_empty() {
trace!("writing {}", self.buf.remaining());
try_ready!(self.inner.write_buf(&mut self.buf)); try_ready!(self.inner.write_buf(&mut self.buf));
} }

View File

@@ -1,6 +1,5 @@
use ConnectionError; use ConnectionError;
use error::Reason::*; use error::Reason::*;
use error::User::*;
use proto::{FlowControlState, WindowSize}; use proto::{FlowControlState, WindowSize};
/// Represents the state of an H2 stream /// Represents the state of an H2 stream
@@ -156,40 +155,24 @@ impl StreamState {
Ok(true) Ok(true)
} }
pub fn check_can_send_data(&self) -> Result<(), ConnectionError> { pub fn can_send_data(&self) -> bool {
use self::StreamState::*; use self::StreamState::*;
match self { match self {
&Open { ref remote, .. } => { &Idle | &Closed | &HalfClosedRemote(..) => false,
try!(remote.check_streaming(UnexpectedFrameType.into()));
Ok(())
}
&HalfClosedLocal(ref remote) => { &Open { ref remote, .. } |
try!(remote.check_streaming(UnexpectedFrameType.into())); &HalfClosedLocal(ref remote) => remote.is_streaming(),
Ok(())
}
&Idle | &Closed | &HalfClosedRemote(..) => {
Err(UnexpectedFrameType.into())
}
} }
} }
pub fn check_can_recv_data(&self) -> Result<(), ConnectionError> { pub fn can_recv_data(&self) -> bool {
use self::StreamState::*; use self::StreamState::*;
match self { match self {
&Open { ref local, .. } => { &Idle | &Closed | &HalfClosedLocal(..) => false,
try!(local.check_streaming(ProtocolError.into()));
Ok(())
}
&Open { ref local, .. } |
&HalfClosedRemote(ref local) => { &HalfClosedRemote(ref local) => {
try!(local.check_streaming(ProtocolError.into())); local.is_streaming()
Ok(())
}
&Idle | &Closed | &HalfClosedLocal(..) => {
Err(ProtocolError.into())
} }
} }
} }
@@ -288,11 +271,11 @@ impl PeerState {
} }
#[inline] #[inline]
fn check_streaming(&self, err: ConnectionError) -> Result<(), ConnectionError> { fn is_streaming(&self) -> bool {
use self::PeerState::*; use self::PeerState::*;
match self { match self {
&Streaming(..) => Ok(()), &Streaming(..) => true,
_ => Err(err), _ => false,
} }
} }

View File

@@ -157,12 +157,12 @@ impl<T: ControlStreams> ControlStreams for StreamRecvClose<T> {
self.inner.send_flow_controller(id) self.inner.send_flow_controller(id)
} }
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { fn can_send_data(&mut self, id: StreamId) -> bool {
self.inner.check_can_send_data(id) self.inner.can_send_data(id)
} }
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { fn can_recv_data(&mut self, id: StreamId) -> bool {
self.inner.check_can_recv_data(id) self.inner.can_recv_data(id)
} }
} }

View File

@@ -170,7 +170,9 @@ impl<T, U> Stream for StreamRecvOpen<T>
if let &Data(..) = &frame { if let &Data(..) = &frame {
// Ensures we've already received headers for this stream. // Ensures we've already received headers for this stream.
self.inner.check_can_recv_data(id)?; if !self.inner.can_recv_data(id) {
return Err(ProtocolError.into());
}
} }
// If the frame ends the stream, it will be handled in // If the frame ends the stream, it will be handled in
@@ -400,12 +402,12 @@ impl<T: ControlStreams> ControlStreams for StreamRecvOpen<T> {
self.inner.send_flow_controller(id) self.inner.send_flow_controller(id)
} }
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { fn can_send_data(&mut self, id: StreamId) -> bool {
self.inner.check_can_send_data(id) self.inner.can_send_data(id)
} }
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { fn can_recv_data(&mut self, id: StreamId) -> bool {
self.inner.check_can_recv_data(id) self.inner.can_recv_data(id)
} }
} }

View File

@@ -162,12 +162,12 @@ impl<T: ControlStreams> ControlStreams for StreamSendClose<T> {
self.inner.send_flow_controller(id) self.inner.send_flow_controller(id)
} }
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { fn can_send_data(&mut self, id: StreamId) -> bool {
self.inner.check_can_send_data(id) self.inner.can_send_data(id)
} }
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { fn can_recv_data(&mut self, id: StreamId) -> bool {
self.inner.check_can_recv_data(id) self.inner.can_recv_data(id)
} }
} }

View File

@@ -1,5 +1,5 @@
use ConnectionError; use ConnectionError;
use error::User::{InvalidStreamId, StreamReset, Rejected}; use error::User::{InactiveStreamId, InvalidStreamId, StreamReset, Rejected, UnexpectedFrameType};
use frame::{Frame, SettingSet}; use frame::{Frame, SettingSet};
use proto::*; use proto::*;
@@ -103,7 +103,11 @@ impl<T, U> Sink for StreamSendOpen<T>
} }
if T::local_valid_id(id) { if T::local_valid_id(id) {
if !self.inner.is_local_active(id) { if self.inner.is_local_active(id) {
if !self.inner.can_send_data(id) {
return Err(InactiveStreamId.into());
}
} else {
if !T::local_can_open() { if !T::local_can_open() {
return Err(InvalidStreamId.into()); return Err(InvalidStreamId.into());
} }
@@ -114,8 +118,11 @@ impl<T, U> Sink for StreamSendOpen<T>
} }
} }
trace!("creating new local stream"); if let &Frame::Headers(..) = &frame {
self.inner.local_open(id, self.initial_window_size)?; self.inner.local_open(id, self.initial_window_size)?;
} else {
return Err(InactiveStreamId.into());
}
} }
} else { } else {
// If the frame is part of a remote stream, it MUST already exist. // If the frame is part of a remote stream, it MUST already exist.
@@ -130,7 +137,9 @@ impl<T, U> Sink for StreamSendOpen<T>
if let &Frame::Data(..) = &frame { if let &Frame::Data(..) = &frame {
// Ensures we've already sent headers for this stream. // Ensures we've already sent headers for this stream.
self.inner.check_can_send_data(id)?; if !self.inner.can_send_data(id) {
return Err(InactiveStreamId.into());
}
} }
trace!("sending frame..."); trace!("sending frame...");
@@ -230,12 +239,12 @@ impl<T: ControlStreams> ControlStreams for StreamSendOpen<T> {
self.inner.send_flow_controller(id) self.inner.send_flow_controller(id)
} }
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { fn can_send_data(&mut self, id: StreamId) -> bool {
self.inner.check_can_send_data(id) self.inner.can_send_data(id)
} }
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { fn can_recv_data(&mut self, id: StreamId) -> bool {
self.inner.check_can_recv_data(id) self.inner.can_recv_data(id)
} }
} }

View File

@@ -76,8 +76,8 @@ pub trait ControlStreams {
fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32); fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32);
fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32); fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32);
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError>; fn can_send_data(&mut self, id: StreamId) -> bool;
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError>; fn can_recv_data(&mut self, id: StreamId) -> bool;
} }
/// Holds the underlying stream state to be accessed by upper layers. /// Holds the underlying stream state to be accessed by upper layers.
@@ -366,18 +366,18 @@ impl<T, P: Peer> ControlStreams for StreamStore<T, P> {
} }
} }
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { fn can_send_data(&mut self, id: StreamId) -> bool {
if let Some(s) = self.get_active(id) { match self.get_active(id) {
return s.check_can_send_data(); Some(s) => s.can_send_data(),
None => false,
} }
Err(ProtocolError.into())
} }
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { fn can_recv_data(&mut self, id: StreamId) -> bool {
if let Some(s) = self.get_active(id) { match self.get_active(id) {
return s.check_can_recv_data(); Some(s) => s.can_recv_data(),
None => false,
} }
Err(ProtocolError.into())
} }
} }

View File

@@ -6,7 +6,6 @@ extern crate futures;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate mock_io; extern crate mock_io;
extern crate test_futures;
// scoped so `cargo test client_request` dtrt. // scoped so `cargo test client_request` dtrt.
mod client_request { mod client_request {
@@ -16,7 +15,6 @@ mod client_request {
use futures::*; use futures::*;
use bytes::Bytes; use bytes::Bytes;
use mock_io; use mock_io;
use test_futures::*;
// TODO: move into another file // TODO: move into another file
macro_rules! assert_user_err { macro_rules! assert_user_err {
@@ -50,12 +48,12 @@ mod client_request {
.write(SETTINGS_ACK) .write(SETTINGS_ACK)
.build(); .build();
let mut h2 = client::handshake(mock) let h2 = client::handshake(mock)
.wait().unwrap(); .wait().unwrap();
trace!("hands have been shook"); trace!("hands have been shook");
// At this point, the connection should be closed // At this point, the connection should be closed
sassert_done(&mut h2); assert!(Stream::wait(h2).next().is_none());
} }
#[test] #[test]
@@ -87,7 +85,7 @@ mod client_request {
// Get the response // Get the response
trace!("getting response"); trace!("getting response");
let (resp, mut h2) = h2.into_future().wait().unwrap(); let (resp, h2) = h2.into_future().wait().unwrap();
match resp.unwrap() { match resp.unwrap() {
Frame::Headers { headers, .. } => { Frame::Headers { headers, .. } => {
@@ -98,7 +96,7 @@ mod client_request {
// No more frames // No more frames
trace!("ensure no more responses"); trace!("ensure no more responses");
sassert_done(&mut h2); assert!(Stream::wait(h2).next().is_none());;
} }
#[test] #[test]
@@ -151,7 +149,7 @@ mod client_request {
} }
// Get the response body // Get the response body
let (data, mut h2) = h2.into_future().wait().unwrap(); let (data, h2) = h2.into_future().wait().unwrap();
match data.unwrap() { match data.unwrap() {
Frame::Data { id, data, end_of_stream, .. } => { Frame::Data { id, data, end_of_stream, .. } => {
@@ -162,7 +160,7 @@ mod client_request {
_ => panic!("unexpected frame"), _ => panic!("unexpected frame"),
} }
sassert_done(&mut h2); assert!(Stream::wait(h2).next().is_none());;
} }
#[test] #[test]
@@ -215,7 +213,7 @@ mod client_request {
} }
// Get the response body // Get the response body
let (data, mut h2) = h2.into_future().wait().unwrap(); let (data, h2) = h2.into_future().wait().unwrap();
match data.unwrap() { match data.unwrap() {
Frame::Data { id, data, end_of_stream, .. } => { Frame::Data { id, data, end_of_stream, .. } => {
@@ -226,7 +224,7 @@ mod client_request {
_ => panic!("unexpected frame"), _ => panic!("unexpected frame"),
} }
sassert_done(&mut h2); assert!(Stream::wait(h2).next().is_none());;
} }
#[test] #[test]
@@ -295,7 +293,7 @@ mod client_request {
} }
// Get the response body // Get the response body
let (data, mut h2) = h2.into_future().wait().expect("into future"); let (data, h2) = h2.into_future().wait().expect("into future");
match data.expect("response data") { match data.expect("response data") {
Frame::Data { id, data, end_of_stream, .. } => { Frame::Data { id, data, end_of_stream, .. } => {
@@ -306,7 +304,7 @@ mod client_request {
_ => panic!("unexpected frame"), _ => panic!("unexpected frame"),
} }
sassert_done(&mut h2); assert!(Stream::wait(h2).next().is_none());;
} }
#[test] #[test]
@@ -352,7 +350,7 @@ mod client_request {
request.uri = "https://http2.akamai.com/".parse().unwrap(); request.uri = "https://http2.akamai.com/".parse().unwrap();
let err = h2.send_request(1.into(), request, true).wait().unwrap_err(); let err = h2.send_request(1.into(), request, true).wait().unwrap_err();
assert_user_err!(err, UnexpectedFrameType); assert_user_err!(err, InactiveStreamId);
} }
#[test] #[test]
@@ -399,7 +397,7 @@ mod client_request {
// Send the data // Send the data
let err = h2.send_data(id, body.into(), true).wait().unwrap_err(); let err = h2.send_data(id, body.into(), true).wait().unwrap_err();
assert_user_err!(err, UnexpectedFrameType); assert_user_err!(err, InactiveStreamId);
} }
#[test] #[test]