Get receiving data working
This commit is contained in:
@@ -3,7 +3,7 @@ use proto::{self, Connection};
|
||||
use error::Reason::*;
|
||||
|
||||
use http::{self, Request, Response};
|
||||
use futures::{Future, Poll, Sink, AsyncSink};
|
||||
use futures::{self, Future, Poll, Sink, AsyncSink};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use bytes::{Bytes, IntoBuf};
|
||||
|
||||
@@ -99,6 +99,19 @@ impl<T, B> Client<T, B>
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, B> Future for Client<T, B>
|
||||
// TODO: Get rid of 'static
|
||||
where T: AsyncRead + AsyncWrite + 'static,
|
||||
B: IntoBuf + 'static,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ConnectionError;
|
||||
|
||||
fn poll(&mut self) -> Poll<(), ConnectionError> {
|
||||
self.connection.poll()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, B> fmt::Debug for Client<T, B>
|
||||
where T: fmt::Debug,
|
||||
B: fmt::Debug + IntoBuf,
|
||||
@@ -167,6 +180,28 @@ impl<B: IntoBuf> Future for Stream<B> {
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Body =====
|
||||
|
||||
impl<B: IntoBuf> futures::Stream for Body<B> {
|
||||
type Item = Chunk<B>;
|
||||
type Error = ConnectionError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
let chunk = try_ready!(self.inner.poll_data())
|
||||
.map(|inner| Chunk { inner });
|
||||
|
||||
Ok(chunk.into())
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Chunk =====
|
||||
|
||||
impl<B: IntoBuf> Chunk<B> {
|
||||
pub fn pop_bytes(&mut self) -> Option<Bytes> {
|
||||
self.inner.pop_bytes()
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Peer =====
|
||||
|
||||
impl proto::Peer for Peer {
|
||||
|
||||
@@ -236,8 +236,12 @@ impl<P, B> Recv<P, B>
|
||||
.take_while(&mut self.buffer, |frame| frame.is_data());
|
||||
|
||||
if frames.is_empty() {
|
||||
stream.recv_task = Some(task::current());
|
||||
Ok(Async::NotReady)
|
||||
if stream.state.is_recv_closed() {
|
||||
Ok(None.into())
|
||||
} else {
|
||||
stream.recv_task = Some(task::current());
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
} else {
|
||||
Ok(Some(Chunk {
|
||||
pending_recv: frames,
|
||||
|
||||
@@ -203,6 +203,13 @@ impl State {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_recv_closed(&self) -> bool {
|
||||
match self.inner {
|
||||
Closed(..) | HalfClosedRemote(..) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> {
|
||||
match self.inner {
|
||||
Open { ref mut remote, .. } |
|
||||
|
||||
@@ -368,6 +368,19 @@ impl<P, B> Clone for StreamRef<P, B> {
|
||||
|
||||
// ===== impl Chunk =====
|
||||
|
||||
impl<P, B> Chunk<P, B>
|
||||
where P: Peer,
|
||||
B: Buf,
|
||||
{
|
||||
// TODO: Come up w/ a better API
|
||||
pub fn pop_bytes(&mut self) -> Option<Bytes> {
|
||||
let mut me = self.inner.lock().unwrap();
|
||||
let me = &mut *me;
|
||||
|
||||
me.actions.recv.pop_bytes(&mut self.recv)
|
||||
}
|
||||
}
|
||||
|
||||
impl<P, B> Drop for Chunk<P, B>
|
||||
where P: Peer,
|
||||
B: Buf,
|
||||
|
||||
@@ -5,8 +5,6 @@ extern crate log;
|
||||
pub mod support;
|
||||
use support::*;
|
||||
|
||||
use h2::Frame;
|
||||
|
||||
#[test]
|
||||
fn send_recv_headers_only() {
|
||||
let _ = env_logger::init();
|
||||
@@ -103,9 +101,10 @@ fn send_recv_data() {
|
||||
|
||||
assert!(Stream::wait(h2).next().is_none());;
|
||||
}
|
||||
*/
|
||||
|
||||
#[test]
|
||||
fn send_headers_recv_data() {
|
||||
fn send_headers_recv_data_single_frame() {
|
||||
let _ = env_logger::init();
|
||||
|
||||
let mock = mock_io::Builder::new()
|
||||
@@ -123,51 +122,40 @@ fn send_headers_recv_data() {
|
||||
])
|
||||
.build();
|
||||
|
||||
let h2 = client::handshake(mock)
|
||||
let mut h2 = Client::handshake(mock)
|
||||
.wait().unwrap();
|
||||
|
||||
// Send the request
|
||||
let mut request = request::Head::default();
|
||||
request.uri = "https://http2.akamai.com/".parse().unwrap();
|
||||
let h2 = h2.send_request(1.into(), request, true).wait().unwrap();
|
||||
let request = Request::builder()
|
||||
.uri("https://http2.akamai.com/")
|
||||
.body(()).unwrap();
|
||||
|
||||
// Get the response headers
|
||||
let (resp, h2) = h2.into_future().wait().unwrap();
|
||||
info!("sending request");
|
||||
let mut stream = h2.request(request, true).unwrap();
|
||||
|
||||
match resp.unwrap() {
|
||||
Frame::Headers { headers, .. } => {
|
||||
assert_eq!(headers.status, status::OK);
|
||||
}
|
||||
_ => panic!("unexpected frame"),
|
||||
}
|
||||
let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap();
|
||||
assert_eq!(resp.status(), status::OK);
|
||||
|
||||
// Get the response body
|
||||
let (data, h2) = h2.into_future().wait().unwrap();
|
||||
// Take the body
|
||||
let (_, body) = resp.into_parts();
|
||||
|
||||
match data.unwrap() {
|
||||
Frame::Data { id, data, end_of_stream, .. } => {
|
||||
assert_eq!(id, 1.into());
|
||||
assert_eq!(data, &b"hello"[..]);
|
||||
assert!(!end_of_stream);
|
||||
}
|
||||
_ => panic!("unexpected frame"),
|
||||
}
|
||||
// Wait for all the data frames to be received
|
||||
let mut chunks = h2.run(body.collect()).unwrap();
|
||||
|
||||
// Get the response body
|
||||
let (data, h2) = h2.into_future().wait().unwrap();
|
||||
// Only one chunk since two frames are coalesced.
|
||||
assert_eq!(1, chunks.len());
|
||||
|
||||
match data.unwrap() {
|
||||
Frame::Data { id, data, end_of_stream, .. } => {
|
||||
assert_eq!(id, 1.into());
|
||||
assert_eq!(data, &b"world"[..]);
|
||||
assert!(end_of_stream);
|
||||
}
|
||||
_ => panic!("unexpected frame"),
|
||||
}
|
||||
let data = chunks[0].pop_bytes().unwrap();
|
||||
assert_eq!(data, &b"hello"[..]);
|
||||
|
||||
assert!(Stream::wait(h2).next().is_none());;
|
||||
let data = chunks[0].pop_bytes().unwrap();
|
||||
assert_eq!(data, &b"world"[..]);
|
||||
|
||||
// The H2 connection is closed
|
||||
h2.wait().unwrap();
|
||||
}
|
||||
|
||||
/*
|
||||
#[test]
|
||||
fn send_headers_twice_with_same_stream_id() {
|
||||
let _ = env_logger::init();
|
||||
|
||||
Reference in New Issue
Block a user