From 71da8d121f39ea40674329d6e0e5999816ffb5d6 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 7 Aug 2017 21:01:15 -0700 Subject: [PATCH] Start hooking up sending data --- src/client.rs | 2 +- src/proto/streams/recv.rs | 2 +- src/proto/streams/send.rs | 10 ++++---- src/proto/streams/stream.rs | 6 ++++- src/proto/streams/streams.rs | 29 ++++++++++++++++++++--- tests/stream_states.rs | 45 +++++++++++++++++++++++++++++------- 6 files changed, 76 insertions(+), 18 deletions(-) diff --git a/src/client.rs b/src/client.rs index ccec356..c9ab889 100644 --- a/src/client.rs +++ b/src/client.rs @@ -160,7 +160,7 @@ impl Stream { pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ConnectionError> { - unimplemented!(); + self.inner.send_data(data.into_buf(), end_of_stream) } /// Send trailers diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index f7ceba9..0c17434 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -73,7 +73,7 @@ impl Recv // Increment the number of remote initiated streams self.num_streams += 1; - Ok(Some(Stream::new())) + Ok(Some(Stream::new(id))) } /// Transition the stream state based on receiving headers diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index a6d8031..8650a31 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -67,7 +67,7 @@ impl Send /// Update state reflecting a new, locally opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self) -> Result<(StreamId, Stream), ConnectionError> { + pub fn open(&mut self) -> Result, ConnectionError> { try!(self.ensure_can_open()); if let Some(max) = self.max_streams { @@ -76,7 +76,7 @@ impl Send } } - let ret = (self.next_stream_id, Stream::new()); + let ret = Stream::new(self.next_stream_id); // Increment the number of locally initiated streams self.num_streams += 1; @@ -106,8 +106,8 @@ impl Send } pub fn send_data(&mut self, - frame: &frame::Data, - stream: &mut Stream) + frame: frame::Data, + stream: &mut store::Ptr) -> Result<(), ConnectionError> { let sz = frame.payload().remaining(); @@ -148,6 +148,8 @@ impl Send try!(stream.state.send_close()); } + self.prioritize.queue_frame(frame.into(), stream); + Ok(()) } diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 588edcb..d26266c 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -2,6 +2,9 @@ use super::*; #[derive(Debug)] pub(super) struct Stream { + /// The h2 stream identifier + pub id: StreamId, + /// Current state of the stream pub state: State, @@ -22,8 +25,9 @@ pub(super) struct Stream { } impl Stream { - pub fn new() -> Stream { + pub fn new(id: StreamId) -> Stream { Stream { + id, state: State::default(), pending_recv: buffer::Deque::new(), recv_task: None, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 9f59a2d..f79e341 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -200,6 +200,7 @@ impl Streams */ } + /* pub fn send_data(&mut self, frame: &frame::Data) -> Result<(), ConnectionError> { @@ -222,6 +223,7 @@ impl Streams Ok(()) } + */ pub fn poll_window_update(&mut self) -> Poll @@ -290,13 +292,13 @@ impl Streams let me = &mut *me; // Initialize a new stream. This fails if the connection is at capacity. - let (id, mut stream) = me.actions.send.open()?; + let mut stream = me.actions.send.open()?; // Convert the message let headers = client::Peer::convert_send_message( - id, request, end_of_stream); + stream.id, request, end_of_stream); - let mut stream = me.store.insert(id, stream); + let mut stream = me.store.insert(stream.id, stream); me.actions.send.send_headers(headers, &mut stream)?; @@ -320,6 +322,27 @@ impl StreamRef where P: Peer, B: Buf, { + pub fn send_data(&mut self, data: B, end_of_stream: bool) + -> Result<(), ConnectionError> + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + // Create the data frame + let frame = frame::Data::from_buf(stream.id, data, end_of_stream); + + // Send the data frame + me.actions.send.send_data(frame, &mut stream)?; + + if stream.state.is_closed() { + me.actions.dec_num_streams(stream.id); + } + + Ok(()) + } + pub fn poll_data(&mut self) -> Poll>, ConnectionError> { let recv = { let mut me = self.inner.lock().unwrap(); diff --git a/tests/stream_states.rs b/tests/stream_states.rs index e3c480e..224a2a3 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -38,7 +38,6 @@ fn send_recv_headers_only() { h2.wait().unwrap(); } -/* #[test] fn send_recv_data() { let _ = env_logger::init(); @@ -64,14 +63,42 @@ fn send_recv_data() { ]) .build(); - let h2 = client::handshake(mock).wait().expect("handshake"); + let mut h2 = Client::handshake2(mock) + .wait().unwrap(); - // Send the request - let mut request = request::Head::default(); - request.method = method::POST; - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, false).wait().expect("send request"); + let request = Request::builder() + .method(method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + info!("sending request"); + let mut stream = h2.request(request, false).unwrap(); + + // Send the data + stream.send_data("hello", true).unwrap(); + + // Get the response + let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + assert_eq!(resp.status(), status::OK); + + // Take the body + let (_, body) = resp.into_parts(); + + // Wait for all the data frames to be received + let mut chunks = h2.run(body.collect()).unwrap(); + + // Only one chunk since two frames are coalesced. + assert_eq!(1, chunks.len()); + + let data = chunks[0].pop_bytes().unwrap(); + assert_eq!(data, &b"world"[..]); + + assert!(chunks[0].pop_bytes().is_none()); + + // The H2 connection is closed + h2.wait().unwrap(); + + /* let b = "hello"; // Send the data @@ -100,8 +127,8 @@ fn send_recv_data() { } assert!(Stream::wait(h2).next().is_none());; + */ } -*/ #[test] fn send_headers_recv_data_single_frame() { @@ -151,6 +178,8 @@ fn send_headers_recv_data_single_frame() { let data = chunks[0].pop_bytes().unwrap(); assert_eq!(data, &b"world"[..]); + assert!(chunks[0].pop_bytes().is_none()); + // The H2 connection is closed h2.wait().unwrap(); }