diff --git a/examples/client.rs b/examples/client.rs index b0a78a9..d0d34dd 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,27 +1,62 @@ extern crate h2; extern crate http; +extern crate bytes; extern crate futures; extern crate tokio_io; extern crate tokio_core; extern crate io_dump; extern crate env_logger; +use h2::*; use h2::client::Client; use http::*; use futures::*; +use bytes::*; use tokio_core::reactor; use tokio_core::net::TcpStream; +struct Process { + body: Body, + trailers: bool, +} + +impl Future for Process { + type Item = (); + type Error = ConnectionError; + + fn poll(&mut self) -> Poll<(), ConnectionError> { + loop { + if self.trailers { + let trailers = try_ready!(self.body.poll_trailers()); + + println!("GOT TRAILERS: {:?}", trailers); + + return Ok(().into()); + } else { + match try_ready!(self.body.poll()) { + Some(chunk) => { + println!("GOT CHUNK = {:?}", chunk); + } + None => { + self.trailers = true; + } + } + } + } + } +} + pub fn main() { let _ = env_logger::init(); let mut core = reactor::Core::new().unwrap();; + let handle = core.handle(); let tcp = TcpStream::connect( &"127.0.0.1:5928".parse().unwrap(), - &core.handle()); + &handle); let tcp = tcp.then(|res| { let tcp = io_dump::Dump::to_stdout(res.unwrap()); @@ -36,11 +71,30 @@ pub fn main() { .uri("https://http2.akamai.com/") .body(()).unwrap(); - let stream = client.request(request, true).unwrap(); - client.join(stream.and_then(|response| { + let mut trailers = h2::HeaderMap::new(); + trailers.insert("zomg", "hello".parse().unwrap()); + + let mut stream = client.request(request, false).unwrap(); + + // send trailers + stream.send_trailers(trailers).unwrap(); + + // Spawn a task to run the client... + handle.spawn(client.map_err(|e| println!("GOT ERR={:?}", e))); + + stream.and_then(|response| { println!("GOT RESPONSE: {:?}", response); - Ok(()) - })) + + // Get the body + let (_, body) = response.into_parts(); + + Process { + body, + trailers: false, + } + }).map_err(|e| { + println!("GOT ERR={:?}", e); + }) }) ; diff --git a/examples/server-tr.rs b/examples/server-tr.rs new file mode 100644 index 0000000..5f7dc05 --- /dev/null +++ b/examples/server-tr.rs @@ -0,0 +1,83 @@ +extern crate h2; +extern crate http; +extern crate bytes; +extern crate futures; +extern crate tokio_io; +extern crate tokio_core; +extern crate io_dump; +extern crate env_logger; + +use h2::server::Server; + +use http::*; +use bytes::*; +use futures::*; + +use tokio_core::reactor; +use tokio_core::net::TcpListener; + +pub fn main() { + let _ = env_logger::init(); + + let mut core = reactor::Core::new().unwrap();; + let handle = core.handle(); + + let listener = TcpListener::bind( + &"127.0.0.1:5928".parse().unwrap(), + &handle).unwrap(); + + println!("listening on {:?}", listener.local_addr()); + + let server = listener.incoming().for_each(move |(socket, _)| { + // let socket = io_dump::Dump::to_stdout(socket); + + + let connection = Server::handshake(socket) + .and_then(|conn| { + println!("H2 connection bound"); + + conn.for_each(|(request, mut stream)| { + println!("GOT request: {:?}", request); + + let response = Response::builder() + .status(status::OK) + .body(()).unwrap(); + + if let Err(e) = stream.send_response(response, false) { + println!(" error responding; err={:?}", e); + } + + println!(">>>> sending data"); + if let Err(e) = stream.send_data(Bytes::from_static(b"hello world"), false) { + println!(" -> err={:?}", e); + } + + let mut hdrs = HeaderMap::new(); + hdrs.insert("status", "ok".parse().unwrap()); + + println!(">>>> sending trailers"); + if let Err(e) = stream.send_trailers(hdrs) { + println!(" -> err={:?}", e); + } + + Ok(()) + }).and_then(|_| { + println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~"); + Ok(()) + }) + }) + .then(|res| { + if let Err(e) = res { + println!(" -> err={:?}", e); + } + + Ok(()) + }) + ; + + handle.spawn(connection); + Ok(()) + }); + + core.run(server).unwrap(); +} diff --git a/src/client.rs b/src/client.rs index a9eafc0..b3b52b2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,4 @@ -use {frame, ConnectionError}; +use {frame, HeaderMap, ConnectionError}; use Body; use frame::StreamId; use proto::{self, Connection, WindowSize}; @@ -173,10 +173,10 @@ impl Stream { } /// Send trailers - pub fn send_trailers(&mut self, _trailers: ()) + pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ConnectionError> { - unimplemented!(); + self.inner.send_trailers::(trailers) } } diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 99edb6e..d90a094 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -98,6 +98,7 @@ const ALL: u8 = END_STREAM // ===== impl Headers ===== impl Headers { + /// Create a new HEADERS frame pub fn new(stream_id: StreamId, pseudo: Pseudo, fields: HeaderMap) -> Self { Headers { stream_id: stream_id, @@ -108,6 +109,19 @@ impl Headers { } } + pub fn trailers(stream_id: StreamId, fields: HeaderMap) -> Self { + let mut flags = HeadersFlag::default(); + flags.set_end_stream(); + + Headers { + stream_id, + stream_dep: None, + fields: fields, + pseudo: Pseudo::default(), + flags: flags, + } + } + /// Loads the header frame but doesn't actually do HPACK decoding. /// /// HPACK decoding is done in the `load_hpack` step. @@ -290,6 +304,10 @@ impl Headers { Ok(request) } + pub fn into_fields(self) -> HeaderMap { + self.fields + } + pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut) -> Option { diff --git a/src/lib.rs b/src/lib.rs index 1892e8d..94d6ec0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,6 +61,13 @@ impl Body { pub fn release_capacity(&mut self, sz: usize) -> Result<(), ConnectionError> { self.inner.release_capacity(sz as proto::WindowSize) } + + /// Poll trailers + /// + /// This function **must** not be called until `Body::poll` returns `None`. + pub fn poll_trailers(&mut self) -> Poll, ConnectionError> { + self.inner.poll_trailers() + } } impl futures::Stream for Body { diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index c138d4c..0cf0977 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -43,6 +43,8 @@ impl Prioritize flow.assign_capacity(config.init_local_window_sz); + trace!("Prioritize::new; flow={:?}", flow); + Prioritize { pending_send: store::Queue::new(), pending_capacity: store::Queue::new(), diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 79e98e1..9756162 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,4 +1,4 @@ -use {client, server, frame, ConnectionError}; +use {client, server, frame, HeaderMap, ConnectionError}; use proto::*; use super::*; @@ -525,19 +525,15 @@ impl Recv where B: Buf { -> Poll, ConnectionError> { match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Frame::Data(frame)) => { + Ok(Some(frame.into_payload()).into()) + } Some(frame) => { - match frame { - Frame::Data(frame) => { - Ok(Some(frame.into_payload()).into()) - } - frame => { - // Frame is trailer - stream.pending_recv.push_front(&mut self.buffer, frame); + // Frame is trailer + stream.pending_recv.push_front(&mut self.buffer, frame); - // No more data frames - Ok(None.into()) - } - } + // No more data frames + Ok(None.into()) } None => { if stream.state.is_recv_closed() { @@ -552,6 +548,32 @@ impl Recv where B: Buf { } } + pub fn poll_trailers(&mut self, stream: &mut Stream) + -> Poll, ConnectionError> + { + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Frame::Headers(frame)) => { + Ok(Some(frame.into_fields()).into()) + } + Some(_) => { + // TODO: This is a user error. `poll_trailers` was called before + // the entire set of data frames have been consumed. What should + // we do? + unimplemented!(); + } + None => { + if stream.state.is_recv_closed() { + // There will be no trailer frame + Ok(None.into()) + } else { + // Request to get notified once another frame arrives + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } + } + } + } + fn reset(&mut self, _stream_id: StreamId, _reason: Reason) { unimplemented!(); } diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index fdab877..989c783 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -133,6 +133,25 @@ impl Send where B: Buf { self.prioritize.send_data(frame, stream, task) } + pub fn send_trailers(&mut self, + frame: frame::Headers, + stream: &mut store::Ptr, + task: &mut Option) + -> Result<(), ConnectionError> + { + // TODO: Should this logic be moved into state.rs? + if !stream.state.is_send_streaming() { + return Err(UnexpectedFrameType.into()); + } + + stream.state.send_close()?; + + trace!("send_trailers -- queuing; frame={:?}", frame); + self.prioritize.queue_frame(frame.into(), stream, task); + + Ok(()) + } + pub fn poll_complete(&mut self, store: &mut Store, dst: &mut Codec>) diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index c51f5e9..ab2baa8 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1,4 +1,4 @@ -use {client, server}; +use {client, server, HeaderMap}; use proto::*; use super::*; @@ -335,6 +335,22 @@ impl StreamRef }) } + pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ConnectionError> + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let stream = me.store.resolve(self.key); + + // Create the trailers frame + let frame = frame::Headers::trailers(stream.id, trailers); + + me.actions.transition::(stream, |actions, stream| { + // Send the trailers frame + actions.send.send_trailers(frame, stream, &mut actions.task) + }) + } + /// Called by the server after the stream is accepted. Given that clients /// initialize streams by sending HEADERS, the request will always be /// available. @@ -403,6 +419,15 @@ impl StreamRef me.actions.recv.poll_data(&mut stream) } + pub fn poll_trailers(&mut self) -> Poll, ConnectionError> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.recv.poll_trailers(&mut stream) + } + /// Releases recv capacity back to the peer. This will result in sending /// WINDOW_UPDATE frames on both the stream and connection. pub fn release_capacity(&mut self, capacity: WindowSize) diff --git a/src/server.rs b/src/server.rs index fa4fe47..9919a89 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,4 +1,4 @@ -use {Body, ConnectionError}; +use {Body, HeaderMap, ConnectionError}; use frame::{self, StreamId}; use proto::{self, Connection, WindowSize}; use error::Reason; @@ -185,10 +185,10 @@ impl Stream { } /// Send trailers - pub fn send_trailers(&mut self, _trailers: ()) + pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ConnectionError> { - unimplemented!(); + self.inner.send_trailers::(trailers) } pub fn send_reset(mut self, reason: Reason) -> Result<(), ConnectionError> { diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 8f58b44..547b76f 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -22,6 +22,7 @@ pub use self::http::{ status, Request, Response, + HeaderMap, }; pub use self::h2::client::{self, Client}; diff --git a/tests/trailers.rs b/tests/trailers.rs index 223078f..98fd0da 100644 --- a/tests/trailers.rs +++ b/tests/trailers.rs @@ -1,4 +1,113 @@ +#[macro_use] +extern crate log; + +#[macro_use] +pub mod support; +use support::*; #[test] -fn recv_trailers_without_eos() { +fn recv_trailers_only() { + let _ = env_logger::init(); + + let mock = mock_io::Builder::new() + .handshake() + // Write GET / + .write(&[ + 0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, + 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, + ]) + .write(frames::SETTINGS_ACK) + // Read response + .read(&[ + 0, 0, 1, 1, 4, 0, 0, 0, 1, 0x88, 0, 0, 9, 1, 5, 0, 0, 0, 1, + 0x40, 0x84, 0x42, 0x46, 0x9B, 0x51, 0x82, 0x3F, 0x5F, + ]) + .build(); + + let mut h2 = Client::handshake(mock) + .wait().unwrap(); + + // Send the request + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + info!("sending request"); + let mut stream = h2.request(request, true).unwrap(); + + let response = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + assert_eq!(response.status(), status::OK); + + let (_, mut body) = response.into_parts(); + + // Make sure there is no body + let chunk = h2.run(poll_fn(|| body.poll())).unwrap(); + assert!(chunk.is_none()); + + let trailers = h2.run(poll_fn(|| body.poll_trailers())).unwrap().unwrap(); + assert_eq!(1, trailers.len()); + assert_eq!(trailers["status"], "ok"); + + h2.wait().unwrap(); +} + +#[test] +fn send_trailers_immediately() { + let _ = env_logger::init(); + + let mock = mock_io::Builder::new() + .handshake() + // Write GET / + .write(&[ + 0, 0, 0x10, 1, 4, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, + 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, 0, 0, + 0x0A, 1, 5, 0, 0, 0, 1, 0x40, 0x83, 0xF6, 0x7A, 0x66, 0x84, 0x9C, + 0xB4, 0x50, 0x7F, + ]) + .write(frames::SETTINGS_ACK) + // Read response + .read(&[ + 0, 0, 1, 1, 4, 0, 0, 0, 1, 0x88, 0, 0, 0x0B, 0, 1, 0, 0, 0, 1, + 0x68, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0x77, 0x6F, 0x72, 0x6C, 0x64, + ]) + .build(); + + let mut h2 = Client::handshake(mock) + .wait().unwrap(); + + // Send the request + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + info!("sending request"); + let mut stream = h2.request(request, false).unwrap(); + + let mut trailers = HeaderMap::new(); + trailers.insert("zomg", "hello".parse().unwrap()); + + stream.send_trailers(trailers).unwrap(); + + let response = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + assert_eq!(response.status(), status::OK); + + let (_, mut body) = response.into_parts(); + + // There is a data chunk + let chunk = h2.run(poll_fn(|| body.poll())).unwrap(); + assert!(chunk.is_some()); + + let chunk = h2.run(poll_fn(|| body.poll())).unwrap(); + assert!(chunk.is_none()); + + let trailers = h2.run(poll_fn(|| body.poll_trailers())).unwrap(); + assert!(trailers.is_none()); + + h2.wait().unwrap(); +} + +#[test] +#[ignore] +fn recv_trailers_without_eos() { + // This should be a protocol error? }