diff --git a/src/proto/connection.rs b/src/proto/connection.rs index a28ce80..271b718 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -92,6 +92,16 @@ impl Connection /// Advances the internal state of the connection. pub fn poll(&mut self) -> Poll<(), ConnectionError> { + match self.poll2() { + Err(e) => { + self.streams.recv_err(&e); + Err(e) + } + ret => ret, + } + } + + fn poll2(&mut self) -> Poll<(), ConnectionError> { use frame::Frame::*; loop { @@ -178,9 +188,6 @@ impl Connection } } } - - // TODO: Flush the write buffer - unimplemented!(); } /* diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 0c17434..8f319c0 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -143,6 +143,14 @@ impl Recv Ok(()) } + pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream) { + // Receive an error + stream.state.recv_err(err); + + // If a receiver is waiting, notify it + stream.notify_recv(); + } + pub fn dec_num_streams(&mut self) { self.num_streams -= 1; } @@ -308,6 +316,8 @@ impl Recv } Some(frame) => unimplemented!(), None => { + stream.state.ensure_recv_open()?; + stream.recv_task = Some(task::current()); Ok(Async::NotReady) } diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index e6b681c..b39bd14 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -66,7 +66,7 @@ enum Inner { HalfClosedLocal(Peer), // TODO: explicitly name this value HalfClosedRemote(Peer), // When reset, a reason is provided - Closed(Option), + Closed(Option), } #[derive(Debug, Copy, Clone)] @@ -76,6 +76,12 @@ enum Peer { Streaming(FlowControl), } +#[derive(Debug, Copy, Clone)] +enum Cause { + Proto(Reason), + Io, +} + impl State { /// Opens the send-half of a stream if it is not already open. pub fn send_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> { @@ -178,6 +184,19 @@ impl State { } } + pub fn recv_err(&mut self, err: &ConnectionError) { + match self.inner { + Closed(..) => {} + _ => { + self.inner = Closed(match *err { + ConnectionError::Proto(reason) => Some(Cause::Proto(reason)), + ConnectionError::Io(..) => Some(Cause::Io), + _ => panic!("cannot terminate stream with user error"), + }); + } + } + } + /// Indicates that the local side will not send more data to the local. pub fn send_close(&mut self) -> Result<(), ConnectionError> { match self.inner { @@ -225,6 +244,21 @@ impl State { _ => None, } } + + pub fn ensure_recv_open(&self) -> Result<(), ConnectionError> { + use std::io; + + // TODO: Is this correct? + match self.inner { + Closed(Some(Cause::Proto(reason))) => { + Err(ConnectionError::Proto(reason)) + } + Closed(Some(Cause::Io)) => { + Err(ConnectionError::Io(io::ErrorKind::BrokenPipe.into())) + } + _ => Ok(()), + } + } } impl Default for State { diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index e4af181..6821ff8 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -90,6 +90,14 @@ impl Store { } } } + + pub fn for_each(&mut self, mut f: F) + where F: FnMut(&mut Stream) + { + for &id in self.ids.values() { + f(&mut self.slab[id]) + } + } } // ===== impl Ptr ===== @@ -142,7 +150,7 @@ impl<'a, B> OccupiedEntry<'a, B> { } // ===== impl VacantEntry ===== -// + impl<'a, B> VacantEntry<'a, B> { pub fn insert(self, value: Stream) -> Key { // Insert the value in the slab diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index f79e341..edfd058 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -141,6 +141,14 @@ impl Streams unimplemented!(); } + pub fn recv_err(&mut self, err: &ConnectionError) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let actions = &mut me.actions; + me.store.for_each(|stream| actions.recv.recv_err(err, stream)); + } + pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), ConnectionError> { let id = frame.stream_id(); diff --git a/tests/client_request.rs b/tests/client_request.rs index 6e897eb..4db144e 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -14,78 +14,13 @@ fn handshake() { .write(SETTINGS_ACK) .build(); - let h2 = client::handshake(mock) + let h2 = Client::handshake(mock) .wait().unwrap(); + trace!("hands have been shook"); // At this point, the connection should be closed - assert!(Stream::wait(h2).next().is_none()); -} - - -#[test] -fn send_request_with_zero_stream_id() { - let mock = mock_io::Builder::new() - .handshake() - .build(); - - let h2 = client::handshake(mock) - .wait().unwrap(); - - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - - let err = h2.send_request(0.into(), request, true).wait().unwrap_err(); - assert_user_err!(err, InvalidStreamId); -} - -#[test] -fn send_request_with_server_stream_id() { - let mock = mock_io::Builder::new() - .handshake() - .build(); - - let h2 = client::handshake(mock) - .wait().unwrap(); - - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - - let err = h2.send_request(2.into(), request, true).wait().unwrap_err(); - assert_user_err!(err, InvalidStreamId); -} - -#[test] -#[ignore] -fn request_without_scheme() { -} - -#[test] -#[ignore] -fn request_with_h1_version() { -} - -#[test] -fn send_invalid_client_stream_id() { - let _ = ::env_logger::init(); - - for &id in &[0, 2] { - let mock = mock_io::Builder::new() - .handshake() - .build(); - - let h2 = client::handshake(mock) - .wait().unwrap(); - - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let err = h2.send_request(id.into(), request, true).wait().unwrap_err(); - - assert_user_err!(err, InvalidStreamId); - } + h2.wait().unwrap(); } #[test] @@ -104,19 +39,35 @@ fn recv_invalid_server_stream_id() { .read(&[0, 0, 1, 1, 5, 0, 0, 0, 2, 137]) .build(); - let h2 = client::handshake(mock) + let mut h2 = Client::handshake(mock) .wait().unwrap(); // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); - // Get the response - let (err, _) = h2.into_future().wait().unwrap_err(); - assert_proto_err!(err, ProtocolError); + info!("sending request"); + let mut stream = h2.request(request, true).unwrap(); + + // The connection errors + assert_proto_err!(h2.wait().unwrap_err(), ProtocolError); + + // The stream errors + assert_proto_err!(stream.wait().unwrap_err(), ProtocolError); } +#[test] +#[ignore] +fn request_without_scheme() { +} + +#[test] +#[ignore] +fn request_with_h1_version() { +} + + #[test] #[ignore] fn sending_request_on_closed_soket() { diff --git a/tests/stream_states.rs b/tests/stream_states.rs index 224a2a3..6ef557a 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -97,37 +97,6 @@ fn send_recv_data() { // The H2 connection is closed h2.wait().unwrap(); - - /* - let b = "hello"; - - // Send the data - let h2 = h2.send_data(1.into(), b.into(), true).wait().expect("send data"); - - // Get the response headers - let (resp, h2) = h2.into_future().wait().expect("into future"); - - match resp.expect("response headers") { - Frame::Headers { headers, .. } => { - assert_eq!(headers.status, status::OK); - } - _ => panic!("unexpected frame"), - } - - // Get the response body - let (data, h2) = h2.into_future().wait().expect("into future"); - - match data.expect("response data") { - Frame::Data { id, data, end_of_stream, .. } => { - assert_eq!(id, 1.into()); - assert_eq!(data, &b"world"[..]); - assert!(end_of_stream); - } - _ => panic!("unexpected frame"), - } - - assert!(Stream::wait(h2).next().is_none());; - */ } #[test] @@ -185,35 +154,6 @@ fn send_headers_recv_data_single_frame() { } /* -#[test] -fn send_headers_twice_with_same_stream_id() { - let _ = env_logger::init(); - - let mock = mock_io::Builder::new() - .handshake() - // Write GET / - .write(&[ - 0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, - 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, - ]) - .build(); - - let h2 = client::handshake(mock) - .wait().unwrap(); - - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); - - // Send another request with the same stream ID - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let err = h2.send_request(1.into(), request, true).wait().unwrap_err(); - - assert_user_err!(err, UnexpectedFrameType); -} - #[test] fn send_data_after_headers_eos() { let _ = env_logger::init(); @@ -246,21 +186,6 @@ fn send_data_after_headers_eos() { assert_user_err!(err, UnexpectedFrameType); } -#[test] -fn send_data_without_headers() { - let mock = mock_io::Builder::new() - .handshake() - .build(); - - let h2 = client::handshake(mock) - .wait().unwrap(); - - let b = Bytes::from_static(b"hello world"); - let err = h2.send_data(1.into(), b, true).wait().unwrap_err(); - - assert_user_err!(err, UnexpectedFrameType); -} - #[test] #[ignore] fn exceed_max_streams() {