use codec::{Codec, RecvError}; use frame::{Headers, Pseudo, Settings, StreamId}; use frame::Reason::*; use proto::{self, Connection, WindowSize}; use bytes::{Bytes, IntoBuf}; use futures::{Async, Future, MapErr, Poll}; use http::{HeaderMap, Request, Response}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::WriteAll; use std::fmt; use std::io; use std::marker::PhantomData; /// In progress H2 connection binding pub struct Handshake { inner: MapErr, fn(io::Error) -> ::Error>, settings: Settings, _marker: PhantomData, } /// Marker type indicating a client peer pub struct Client { connection: Connection, } #[derive(Debug)] pub struct Stream { inner: proto::StreamRef, } #[derive(Debug)] pub struct Body { inner: proto::StreamRef, } /// Build a Client. #[derive(Debug, Default)] pub struct Builder { settings: Settings, } #[derive(Debug)] pub(crate) struct Peer; impl Client where T: AsyncRead + AsyncWrite, { /// Bind an H2 client connection. /// /// Returns a future which resolves to the connection value once the H2 /// handshake has been completed. /// /// It's important to note that this does not **flush** the outbound /// settings to the wire. pub fn handshake(io: T) -> Handshake { Builder::default().handshake(io) } } impl Client<(), Bytes> { /// Creates a Client Builder to customize a Client before binding. pub fn builder() -> Builder { Builder::default() } } impl Client where T: AsyncRead + AsyncWrite, B: IntoBuf { fn handshake2(io: T, settings: Settings) -> Handshake { use tokio_io::io; debug!("binding client connection"); let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; let handshake = io::write_all(io, msg) .map_err(::Error::from as _); Handshake { inner: handshake, settings: settings, _marker: PhantomData, } } /// Returns `Ready` when the connection can initialize a new HTTP 2.0 /// stream. pub fn poll_ready(&mut self) -> Poll<(), ::Error> { Ok(self.connection.poll_send_request_ready()) } /// Send a request on a new HTTP 2.0 stream pub fn request( &mut self, request: Request<()>, end_of_stream: bool, ) -> Result, ::Error> { self.connection .send_request(request, end_of_stream) .map_err(Into::into) .map(|stream| { Stream { inner: stream, } }) } } impl Future for Client where T: AsyncRead + AsyncWrite, B: IntoBuf, { type Item = (); type Error = ::Error; fn poll(&mut self) -> Poll<(), ::Error> { self.connection.poll().map_err(Into::into) } } impl fmt::Debug for Client where T: AsyncRead + AsyncWrite, T: fmt::Debug, B: fmt::Debug + IntoBuf, B::Buf: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("Client") .field("connection", &self.connection) .finish() } } #[cfg(feature = "unstable")] impl Client where T: AsyncRead + AsyncWrite, B: IntoBuf, { /// Returns the number of active streams. /// /// An active stream is a stream that has not yet transitioned to a closed /// state. pub fn num_active_streams(&self) -> usize { self.connection.num_active_streams() } /// Returns the number of streams that are held in memory. /// /// A wired stream is a stream that is either active or is closed but must /// stay in memory for some reason. For example, there are still outstanding /// userspace handles pointing to the slot. pub fn num_wired_streams(&self) -> usize { self.connection.num_wired_streams() } } // ===== impl Builder ===== impl Builder { /// Set the initial window size of the remote peer. pub fn initial_window_size(&mut self, size: u32) -> &mut Self { self.settings.set_initial_window_size(Some(size)); self } /// Bind an H2 client connection. /// /// Returns a future which resolves to the connection value once the H2 /// handshake has been completed. /// /// It's important to note that this does not **flush** the outbound /// settings to the wire. pub fn handshake(&self, io: T) -> Handshake where T: AsyncRead + AsyncWrite, B: IntoBuf { Client::handshake2(io, self.settings.clone()) } } // ===== impl Handshake ===== impl Future for Handshake where T: AsyncRead + AsyncWrite, { type Item = Client; type Error = ::Error; fn poll(&mut self) -> Poll { let (io, _) = try_ready!(self.inner.poll()); debug!("client connection bound"); // Create the codec let mut codec = Codec::new(io); // Send initial settings frame codec.buffer(self.settings.clone().into()) .expect("invalid SETTINGS frame"); let connection = Connection::new(codec, &self.settings); Ok(Async::Ready(Client { connection, })) } } impl fmt::Debug for Handshake where T: AsyncRead + AsyncWrite, T: fmt::Debug, B: fmt::Debug + IntoBuf, B::Buf: fmt::Debug + IntoBuf, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "client::Handshake") } } // ===== impl Stream ===== impl Stream { /// Receive the HTTP/2.0 response, if it is ready. pub fn poll_response(&mut self) -> Poll>, ::Error> { let (parts, _) = try_ready!(self.inner.poll_response()).into_parts(); let body = Body { inner: self.inner.clone(), }; Ok(Response::from_parts(parts, body).into()) } /// Request capacity to send data pub fn reserve_capacity(&mut self, capacity: usize) { // TODO: Check for overflow self.inner.reserve_capacity(capacity as WindowSize) } /// Returns the stream's current send capacity. pub fn capacity(&self) -> usize { self.inner.capacity() as usize } /// Request to be notified when the stream's capacity increases pub fn poll_capacity(&mut self) -> Poll, ::Error> { let res = try_ready!(self.inner.poll_capacity()); Ok(Async::Ready(res.map(|v| v as usize))) } /// Send data pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ::Error> { self.inner .send_data(data.into_buf(), end_of_stream) .map_err(Into::into) } /// Send trailers pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ::Error> { self.inner.send_trailers(trailers).map_err(Into::into) } } impl Future for Stream { type Item = Response>; type Error = ::Error; fn poll(&mut self) -> Poll { self.poll_response() } } // ===== impl Body ===== impl Body { pub fn is_empty(&self) -> bool { // If the recv side is closed and the receive queue is empty, the body is empty. self.inner.body_is_empty() } pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { self.inner .release_capacity(sz as proto::WindowSize) .map_err(Into::into) } /// Poll trailers /// /// This function **must** not be called until `Body::poll` returns `None`. pub fn poll_trailers(&mut self) -> Poll, ::Error> { self.inner.poll_trailers().map_err(Into::into) } } impl ::futures::Stream for Body { type Item = Bytes; type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { self.inner.poll_data().map_err(Into::into) } } // ===== impl Peer ===== impl proto::Peer for Peer { type Send = Request<()>; type Poll = Response<()>; fn is_server() -> bool { false } fn convert_send_message(id: StreamId, request: Self::Send, end_of_stream: bool) -> Headers { use http::request::Parts; let ( Parts { method, uri, headers, .. }, _, ) = request.into_parts(); // Build the set pseudo header set. All requests will include `method` // and `path`. let pseudo = Pseudo::request(method, uri); // Create the HEADERS frame let mut frame = Headers::new(id, pseudo, headers); if end_of_stream { frame.set_end_stream() } frame } fn convert_poll_message(headers: Headers) -> Result { let mut b = Response::builder(); let stream_id = headers.stream_id(); let (pseudo, fields) = headers.into_parts(); if let Some(status) = pseudo.status { b.status(status); } let mut response = match b.body(()) { Ok(response) => response, Err(_) => { // TODO: Should there be more specialized handling for different // kinds of errors return Err(RecvError::Stream { id: stream_id, reason: ProtocolError, }); }, }; *response.headers_mut() = fields; Ok(response) } }