diff --git a/src/client.rs b/src/client.rs index 7ae70c0..ccec356 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,7 +3,7 @@ use proto::{self, Connection}; use error::Reason::*; use http::{self, Request, Response}; -use futures::{Future, Poll, Sink, AsyncSink}; +use futures::{self, Future, Poll, Sink, AsyncSink}; use tokio_io::{AsyncRead, AsyncWrite}; use bytes::{Bytes, IntoBuf}; @@ -99,6 +99,19 @@ impl Client } } +impl Future for Client + // TODO: Get rid of 'static + where T: AsyncRead + AsyncWrite + 'static, + B: IntoBuf + 'static, +{ + type Item = (); + type Error = ConnectionError; + + fn poll(&mut self) -> Poll<(), ConnectionError> { + self.connection.poll() + } +} + impl fmt::Debug for Client where T: fmt::Debug, B: fmt::Debug + IntoBuf, @@ -167,6 +180,28 @@ impl Future for Stream { } } +// ===== impl Body ===== + +impl futures::Stream for Body { + type Item = Chunk; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, Self::Error> { + let chunk = try_ready!(self.inner.poll_data()) + .map(|inner| Chunk { inner }); + + Ok(chunk.into()) + } +} + +// ===== impl Chunk ===== + +impl Chunk { + pub fn pop_bytes(&mut self) -> Option { + self.inner.pop_bytes() + } +} + // ===== impl Peer ===== impl proto::Peer for Peer { diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 8a6d4d1..f7ceba9 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -236,8 +236,12 @@ impl Recv .take_while(&mut self.buffer, |frame| frame.is_data()); if frames.is_empty() { - stream.recv_task = Some(task::current()); - Ok(Async::NotReady) + if stream.state.is_recv_closed() { + Ok(None.into()) + } else { + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } } else { Ok(Some(Chunk { pending_recv: frames, diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 369ac8c..e6b681c 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -203,6 +203,13 @@ impl State { } } + pub fn is_recv_closed(&self) -> bool { + match self.inner { + Closed(..) | HalfClosedRemote(..) => true, + _ => false, + } + } + pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> { match self.inner { Open { ref mut remote, .. } | diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 23a44be..9f59a2d 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -368,6 +368,19 @@ impl Clone for StreamRef { // ===== impl Chunk ===== +impl Chunk + where P: Peer, + B: Buf, +{ + // TODO: Come up w/ a better API + pub fn pop_bytes(&mut self) -> Option { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions.recv.pop_bytes(&mut self.recv) + } +} + impl Drop for Chunk where P: Peer, B: Buf, diff --git a/tests/stream_states.rs b/tests/stream_states.rs index eef4318..e3c480e 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -5,8 +5,6 @@ extern crate log; pub mod support; use support::*; -use h2::Frame; - #[test] fn send_recv_headers_only() { let _ = env_logger::init(); @@ -103,9 +101,10 @@ fn send_recv_data() { assert!(Stream::wait(h2).next().is_none());; } +*/ #[test] -fn send_headers_recv_data() { +fn send_headers_recv_data_single_frame() { let _ = env_logger::init(); let mock = mock_io::Builder::new() @@ -123,51 +122,40 @@ fn send_headers_recv_data() { ]) .build(); - let h2 = client::handshake(mock) + let mut h2 = Client::handshake(mock) .wait().unwrap(); // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); - // Get the response headers - let (resp, h2) = h2.into_future().wait().unwrap(); + info!("sending request"); + let mut stream = h2.request(request, true).unwrap(); - match resp.unwrap() { - Frame::Headers { headers, .. } => { - assert_eq!(headers.status, status::OK); - } - _ => panic!("unexpected frame"), - } + let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + assert_eq!(resp.status(), status::OK); - // Get the response body - let (data, h2) = h2.into_future().wait().unwrap(); + // Take the body + let (_, body) = resp.into_parts(); - match data.unwrap() { - Frame::Data { id, data, end_of_stream, .. } => { - assert_eq!(id, 1.into()); - assert_eq!(data, &b"hello"[..]); - assert!(!end_of_stream); - } - _ => panic!("unexpected frame"), - } + // Wait for all the data frames to be received + let mut chunks = h2.run(body.collect()).unwrap(); - // Get the response body - let (data, h2) = h2.into_future().wait().unwrap(); + // Only one chunk since two frames are coalesced. + assert_eq!(1, chunks.len()); - match data.unwrap() { - Frame::Data { id, data, end_of_stream, .. } => { - assert_eq!(id, 1.into()); - assert_eq!(data, &b"world"[..]); - assert!(end_of_stream); - } - _ => panic!("unexpected frame"), - } + let data = chunks[0].pop_bytes().unwrap(); + assert_eq!(data, &b"hello"[..]); - assert!(Stream::wait(h2).next().is_none());; + let data = chunks[0].pop_bytes().unwrap(); + assert_eq!(data, &b"world"[..]); + + // The H2 connection is closed + h2.wait().unwrap(); } +/* #[test] fn send_headers_twice_with_same_stream_id() { let _ = env_logger::init();