feat(client): introduce version-specific client modules (#2906)
This creates `client::conn::http1` and `client::conn::http2` modules, both with specific `SendRequest`, `Connection`, and `Builder` types.
This commit is contained in:
504
src/client/conn/http1.rs
Normal file
504
src/client/conn/http1.rs
Normal file
@@ -0,0 +1,504 @@
|
||||
//! HTTP/1 client connections
|
||||
|
||||
use std::error::Error as StdError;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
use http::{Request, Response};
|
||||
use httparse::ParserConfig;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use crate::Body;
|
||||
use crate::body::HttpBody;
|
||||
use crate::common::{
|
||||
exec::{BoxSendFuture, Exec},
|
||||
task, Future, Pin, Poll,
|
||||
};
|
||||
use crate::upgrade::Upgraded;
|
||||
use crate::proto;
|
||||
use crate::rt::Executor;
|
||||
use super::super::dispatch;
|
||||
|
||||
type Dispatcher<T, B> =
|
||||
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
|
||||
|
||||
/// The sender side of an established connection.
|
||||
pub struct SendRequest<B> {
|
||||
dispatch: dispatch::Sender<Request<B>, Response<Body>>,
|
||||
}
|
||||
|
||||
/// A future that processes all HTTP state for the IO object.
|
||||
///
|
||||
/// In most cases, this should just be spawned into an executor, so that it
|
||||
/// can process incoming and outgoing messages, notice hangups, and the like.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct Connection<T, B>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Send + 'static,
|
||||
B: HttpBody + 'static,
|
||||
{
|
||||
inner: Option<Dispatcher<T, B>>,
|
||||
}
|
||||
|
||||
/// A builder to configure an HTTP connection.
|
||||
///
|
||||
/// After setting options, the builder is used to create a handshake future.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Builder {
|
||||
pub(super) exec: Exec,
|
||||
h09_responses: bool,
|
||||
h1_parser_config: ParserConfig,
|
||||
h1_writev: Option<bool>,
|
||||
h1_title_case_headers: bool,
|
||||
h1_preserve_header_case: bool,
|
||||
#[cfg(feature = "ffi")]
|
||||
h1_preserve_header_order: bool,
|
||||
h1_read_buf_exact_size: Option<usize>,
|
||||
h1_max_buf_size: Option<usize>,
|
||||
}
|
||||
|
||||
/// Returns a handshake future over some IO.
|
||||
///
|
||||
/// This is a shortcut for `Builder::new().handshake(io)`.
|
||||
/// See [`client::conn`](crate::client::conn) for more.
|
||||
pub async fn handshake<T>(
|
||||
io: T,
|
||||
) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
Builder::new().handshake(io).await
|
||||
}
|
||||
|
||||
// ===== impl SendRequest
|
||||
|
||||
impl<B> SendRequest<B> {
|
||||
/// Polls to determine whether this sender can be used yet for a request.
|
||||
///
|
||||
/// If the associated connection is closed, this returns an Error.
|
||||
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
|
||||
self.dispatch.poll_ready(cx)
|
||||
}
|
||||
|
||||
/*
|
||||
pub(super) async fn when_ready(self) -> crate::Result<Self> {
|
||||
let mut me = Some(self);
|
||||
future::poll_fn(move |cx| {
|
||||
ready!(me.as_mut().unwrap().poll_ready(cx))?;
|
||||
Poll::Ready(Ok(me.take().unwrap()))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(super) fn is_ready(&self) -> bool {
|
||||
self.dispatch.is_ready()
|
||||
}
|
||||
|
||||
pub(super) fn is_closed(&self) -> bool {
|
||||
self.dispatch.is_closed()
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
impl<B> SendRequest<B>
|
||||
where
|
||||
B: HttpBody + 'static,
|
||||
{
|
||||
/// Sends a `Request` on the associated connection.
|
||||
///
|
||||
/// Returns a future that if successful, yields the `Response`.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// There are some key differences in what automatic things the `Client`
|
||||
/// does for you that will not be done here:
|
||||
///
|
||||
/// - `Client` requires absolute-form `Uri`s, since the scheme and
|
||||
/// authority are needed to connect. They aren't required here.
|
||||
/// - Since the `Client` requires absolute-form `Uri`s, it can add
|
||||
/// the `Host` header based on it. You must add a `Host` header yourself
|
||||
/// before calling this method.
|
||||
/// - Since absolute-form `Uri`s are not required, if received, they will
|
||||
/// be serialized as-is.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # use http::header::HOST;
|
||||
/// # use hyper::client::conn::SendRequest;
|
||||
/// # use hyper::Body;
|
||||
/// use hyper::Request;
|
||||
///
|
||||
/// # async fn doc(mut tx: SendRequest<Body>) -> hyper::Result<()> {
|
||||
/// // build a Request
|
||||
/// let req = Request::builder()
|
||||
/// .uri("/foo/bar")
|
||||
/// .header(HOST, "hyper.rs")
|
||||
/// .body(Body::empty())
|
||||
/// .unwrap();
|
||||
///
|
||||
/// // send it and await a Response
|
||||
/// let res = tx.send_request(req).await?;
|
||||
/// // assert the Response
|
||||
/// assert!(res.status().is_success());
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// # fn main() {}
|
||||
/// ```
|
||||
pub fn send_request(&mut self, req: Request<B>) -> impl Future<Output = crate::Result<Response<Body>>> {
|
||||
let sent = self.dispatch.send(req);
|
||||
|
||||
async move {
|
||||
match sent {
|
||||
Ok(rx) => match rx.await {
|
||||
Ok(Ok(resp)) => Ok(resp),
|
||||
Ok(Err(err)) => Err(err),
|
||||
// this is definite bug if it happens, but it shouldn't happen!
|
||||
Err(_canceled) => panic!("dispatch dropped without returning error"),
|
||||
}
|
||||
Err(_req) => {
|
||||
tracing::debug!("connection was not ready");
|
||||
|
||||
Err(crate::Error::new_canceled().with("connection was not ready"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
pub(super) fn send_request_retryable(
|
||||
&mut self,
|
||||
req: Request<B>,
|
||||
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
|
||||
where
|
||||
B: Send,
|
||||
{
|
||||
match self.dispatch.try_send(req) {
|
||||
Ok(rx) => {
|
||||
Either::Left(rx.then(move |res| {
|
||||
match res {
|
||||
Ok(Ok(res)) => future::ok(res),
|
||||
Ok(Err(err)) => future::err(err),
|
||||
// this is definite bug if it happens, but it shouldn't happen!
|
||||
Err(_) => panic!("dispatch dropped without returning error"),
|
||||
}
|
||||
}))
|
||||
}
|
||||
Err(req) => {
|
||||
tracing::debug!("connection was not ready");
|
||||
let err = crate::Error::new_canceled().with("connection was not ready");
|
||||
Either::Right(future::err((err, Some(req))))
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
impl<B> fmt::Debug for SendRequest<B> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("SendRequest").finish()
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Connection
|
||||
|
||||
impl<T, B> fmt::Debug for Connection<T, B>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
|
||||
B: HttpBody + 'static,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Connection").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, B> Future for Connection<T, B>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
B: HttpBody + Send + 'static,
|
||||
B::Data: Send,
|
||||
B::Error: Into<Box<dyn StdError + Send + Sync>>,
|
||||
{
|
||||
type Output = crate::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||
match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
|
||||
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
|
||||
proto::Dispatched::Upgrade(pending) => match self.inner.take() {
|
||||
Some(h1) => {
|
||||
let (io, buf, _) = h1.into_inner();
|
||||
pending.fulfill(Upgraded::new(io, buf));
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
_ => {
|
||||
drop(pending);
|
||||
unreachable!("Upgraded twice");
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Builder
|
||||
|
||||
impl Builder {
|
||||
/// Creates a new connection builder.
|
||||
#[inline]
|
||||
pub fn new() -> Builder {
|
||||
Builder {
|
||||
exec: Exec::Default,
|
||||
h09_responses: false,
|
||||
h1_writev: None,
|
||||
h1_read_buf_exact_size: None,
|
||||
h1_parser_config: Default::default(),
|
||||
h1_title_case_headers: false,
|
||||
h1_preserve_header_case: false,
|
||||
#[cfg(feature = "ffi")]
|
||||
h1_preserve_header_order: false,
|
||||
h1_max_buf_size: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Provide an executor to execute background HTTP2 tasks.
|
||||
pub fn executor<E>(&mut self, exec: E) -> &mut Builder
|
||||
where
|
||||
E: Executor<BoxSendFuture> + Send + Sync + 'static,
|
||||
{
|
||||
self.exec = Exec::Executor(Arc::new(exec));
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether HTTP/0.9 responses should be tolerated.
|
||||
///
|
||||
/// Default is false.
|
||||
pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
|
||||
self.h09_responses = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether HTTP/1 connections will accept spaces between header names
|
||||
/// and the colon that follow them in responses.
|
||||
///
|
||||
/// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
|
||||
/// to say about it:
|
||||
///
|
||||
/// > No whitespace is allowed between the header field-name and colon. In
|
||||
/// > the past, differences in the handling of such whitespace have led to
|
||||
/// > security vulnerabilities in request routing and response handling. A
|
||||
/// > server MUST reject any received request message that contains
|
||||
/// > whitespace between a header field-name and colon with a response code
|
||||
/// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
|
||||
/// > response message before forwarding the message downstream.
|
||||
///
|
||||
/// Note that this setting does not affect HTTP/2.
|
||||
///
|
||||
/// Default is false.
|
||||
///
|
||||
/// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
|
||||
pub fn http1_allow_spaces_after_header_name_in_responses(
|
||||
&mut self,
|
||||
enabled: bool,
|
||||
) -> &mut Builder {
|
||||
self.h1_parser_config
|
||||
.allow_spaces_after_header_name_in_responses(enabled);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether HTTP/1 connections will accept obsolete line folding for
|
||||
/// header values.
|
||||
///
|
||||
/// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
|
||||
/// parsing.
|
||||
///
|
||||
/// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
|
||||
/// to say about it:
|
||||
///
|
||||
/// > A server that receives an obs-fold in a request message that is not
|
||||
/// > within a message/http container MUST either reject the message by
|
||||
/// > sending a 400 (Bad Request), preferably with a representation
|
||||
/// > explaining that obsolete line folding is unacceptable, or replace
|
||||
/// > each received obs-fold with one or more SP octets prior to
|
||||
/// > interpreting the field value or forwarding the message downstream.
|
||||
///
|
||||
/// > A proxy or gateway that receives an obs-fold in a response message
|
||||
/// > that is not within a message/http container MUST either discard the
|
||||
/// > message and replace it with a 502 (Bad Gateway) response, preferably
|
||||
/// > with a representation explaining that unacceptable line folding was
|
||||
/// > received, or replace each received obs-fold with one or more SP
|
||||
/// > octets prior to interpreting the field value or forwarding the
|
||||
/// > message downstream.
|
||||
///
|
||||
/// > A user agent that receives an obs-fold in a response message that is
|
||||
/// > not within a message/http container MUST replace each received
|
||||
/// > obs-fold with one or more SP octets prior to interpreting the field
|
||||
/// > value.
|
||||
///
|
||||
/// Note that this setting does not affect HTTP/2.
|
||||
///
|
||||
/// Default is false.
|
||||
///
|
||||
/// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
|
||||
pub fn http1_allow_obsolete_multiline_headers_in_responses(
|
||||
&mut self,
|
||||
enabled: bool,
|
||||
) -> &mut Builder {
|
||||
self.h1_parser_config
|
||||
.allow_obsolete_multiline_headers_in_responses(enabled);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether HTTP/1 connections should try to use vectored writes,
|
||||
/// or always flatten into a single buffer.
|
||||
///
|
||||
/// Note that setting this to false may mean more copies of body data,
|
||||
/// but may also improve performance when an IO transport doesn't
|
||||
/// support vectored writes well, such as most TLS implementations.
|
||||
///
|
||||
/// Setting this to true will force hyper to use queued strategy
|
||||
/// which may eliminate unnecessary cloning on some TLS backends
|
||||
///
|
||||
/// Default is `auto`. In this mode hyper will try to guess which
|
||||
/// mode to use
|
||||
pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
|
||||
self.h1_writev = Some(enabled);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether HTTP/1 connections will write header names as title case at
|
||||
/// the socket level.
|
||||
///
|
||||
/// Note that this setting does not affect HTTP/2.
|
||||
///
|
||||
/// Default is false.
|
||||
pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Builder {
|
||||
self.h1_title_case_headers = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether to support preserving original header cases.
|
||||
///
|
||||
/// Currently, this will record the original cases received, and store them
|
||||
/// in a private extension on the `Response`. It will also look for and use
|
||||
/// such an extension in any provided `Request`.
|
||||
///
|
||||
/// Since the relevant extension is still private, there is no way to
|
||||
/// interact with the original cases. The only effect this can have now is
|
||||
/// to forward the cases in a proxy-like fashion.
|
||||
///
|
||||
/// Note that this setting does not affect HTTP/2.
|
||||
///
|
||||
/// Default is false.
|
||||
pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
|
||||
self.h1_preserve_header_case = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether to support preserving original header order.
|
||||
///
|
||||
/// Currently, this will record the order in which headers are received, and store this
|
||||
/// ordering in a private extension on the `Response`. It will also look for and use
|
||||
/// such an extension in any provided `Request`.
|
||||
///
|
||||
/// Note that this setting does not affect HTTP/2.
|
||||
///
|
||||
/// Default is false.
|
||||
#[cfg(feature = "ffi")]
|
||||
pub fn http1_preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
|
||||
self.h1_preserve_header_order = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the exact size of the read buffer to *always* use.
|
||||
///
|
||||
/// Note that setting this option unsets the `http1_max_buf_size` option.
|
||||
///
|
||||
/// Default is an adaptive read buffer.
|
||||
pub fn http1_read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
|
||||
self.h1_read_buf_exact_size = sz;
|
||||
self.h1_max_buf_size = None;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the maximum buffer size for the connection.
|
||||
///
|
||||
/// Default is ~400kb.
|
||||
///
|
||||
/// Note that setting this option unsets the `http1_read_exact_buf_size` option.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
|
||||
#[cfg(feature = "http1")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
|
||||
pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
|
||||
assert!(
|
||||
max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
|
||||
"the max_buf_size cannot be smaller than the minimum that h1 specifies."
|
||||
);
|
||||
|
||||
self.h1_max_buf_size = Some(max);
|
||||
self.h1_read_buf_exact_size = None;
|
||||
self
|
||||
}
|
||||
|
||||
/// Constructs a connection with the configured options and IO.
|
||||
/// See [`client::conn`](crate::client::conn) for more.
|
||||
///
|
||||
/// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
|
||||
/// do nothing.
|
||||
pub fn handshake<T, B>(
|
||||
&self,
|
||||
io: T,
|
||||
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
B: HttpBody + 'static,
|
||||
B::Data: Send,
|
||||
B::Error: Into<Box<dyn StdError + Send + Sync>>,
|
||||
{
|
||||
let opts = self.clone();
|
||||
|
||||
async move {
|
||||
tracing::trace!("client handshake HTTP/1");
|
||||
|
||||
let (tx, rx) = dispatch::channel();
|
||||
let mut conn = proto::Conn::new(io);
|
||||
conn.set_h1_parser_config(opts.h1_parser_config);
|
||||
if let Some(writev) = opts.h1_writev {
|
||||
if writev {
|
||||
conn.set_write_strategy_queue();
|
||||
} else {
|
||||
conn.set_write_strategy_flatten();
|
||||
}
|
||||
}
|
||||
if opts.h1_title_case_headers {
|
||||
conn.set_title_case_headers();
|
||||
}
|
||||
if opts.h1_preserve_header_case {
|
||||
conn.set_preserve_header_case();
|
||||
}
|
||||
#[cfg(feature = "ffi")]
|
||||
if opts.h1_preserve_header_order {
|
||||
conn.set_preserve_header_order();
|
||||
}
|
||||
if opts.h09_responses {
|
||||
conn.set_h09_responses();
|
||||
}
|
||||
|
||||
if let Some(sz) = opts.h1_read_buf_exact_size {
|
||||
conn.set_read_buf_exact_size(sz);
|
||||
}
|
||||
if let Some(max) = opts.h1_max_buf_size {
|
||||
conn.set_max_buf_size(max);
|
||||
}
|
||||
let cd = proto::h1::dispatch::Client::new(rx);
|
||||
let proto = proto::h1::Dispatcher::new(cd, conn);
|
||||
|
||||
Ok((
|
||||
SendRequest { dispatch: tx },
|
||||
Connection { inner: Some(proto) },
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
432
src/client/conn/http2.rs
Normal file
432
src/client/conn/http2.rs
Normal file
@@ -0,0 +1,432 @@
|
||||
//! HTTP/2 client connections
|
||||
|
||||
use std::error::Error as StdError;
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
#[cfg(feature = "runtime")]
|
||||
use std::time::Duration;
|
||||
|
||||
use http::{Request, Response};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use crate::Body;
|
||||
use crate::body::HttpBody;
|
||||
use crate::common::{
|
||||
exec::{BoxSendFuture, Exec},
|
||||
task, Future, Pin, Poll,
|
||||
};
|
||||
use crate::proto;
|
||||
use crate::rt::Executor;
|
||||
use super::super::dispatch;
|
||||
|
||||
/// The sender side of an established connection.
|
||||
pub struct SendRequest<B> {
|
||||
dispatch: dispatch::Sender<Request<B>, Response<Body>>,
|
||||
}
|
||||
|
||||
/// A future that processes all HTTP state for the IO object.
|
||||
///
|
||||
/// In most cases, this should just be spawned into an executor, so that it
|
||||
/// can process incoming and outgoing messages, notice hangups, and the like.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct Connection<T, B>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Send + 'static,
|
||||
B: HttpBody + 'static,
|
||||
{
|
||||
inner: (PhantomData<T>, proto::h2::ClientTask<B>),
|
||||
}
|
||||
|
||||
/// A builder to configure an HTTP connection.
|
||||
///
|
||||
/// After setting options, the builder is used to create a handshake future.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Builder {
|
||||
pub(super) exec: Exec,
|
||||
h2_builder: proto::h2::client::Config,
|
||||
}
|
||||
|
||||
/// Returns a handshake future over some IO.
|
||||
///
|
||||
/// This is a shortcut for `Builder::new().handshake(io)`.
|
||||
/// See [`client::conn`](crate::client::conn) for more.
|
||||
pub async fn handshake<T>(
|
||||
io: T,
|
||||
) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
Builder::new().handshake(io).await
|
||||
}
|
||||
|
||||
// ===== impl SendRequest
|
||||
|
||||
impl<B> SendRequest<B> {
|
||||
/// Polls to determine whether this sender can be used yet for a request.
|
||||
///
|
||||
/// If the associated connection is closed, this returns an Error.
|
||||
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
|
||||
self.dispatch.poll_ready(cx)
|
||||
}
|
||||
|
||||
/*
|
||||
pub(super) async fn when_ready(self) -> crate::Result<Self> {
|
||||
let mut me = Some(self);
|
||||
future::poll_fn(move |cx| {
|
||||
ready!(me.as_mut().unwrap().poll_ready(cx))?;
|
||||
Poll::Ready(Ok(me.take().unwrap()))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(super) fn is_ready(&self) -> bool {
|
||||
self.dispatch.is_ready()
|
||||
}
|
||||
|
||||
pub(super) fn is_closed(&self) -> bool {
|
||||
self.dispatch.is_closed()
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
impl<B> SendRequest<B>
|
||||
where
|
||||
B: HttpBody + 'static,
|
||||
{
|
||||
/// Sends a `Request` on the associated connection.
|
||||
///
|
||||
/// Returns a future that if successful, yields the `Response`.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// There are some key differences in what automatic things the `Client`
|
||||
/// does for you that will not be done here:
|
||||
///
|
||||
/// - `Client` requires absolute-form `Uri`s, since the scheme and
|
||||
/// authority are needed to connect. They aren't required here.
|
||||
/// - Since the `Client` requires absolute-form `Uri`s, it can add
|
||||
/// the `Host` header based on it. You must add a `Host` header yourself
|
||||
/// before calling this method.
|
||||
/// - Since absolute-form `Uri`s are not required, if received, they will
|
||||
/// be serialized as-is.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # use http::header::HOST;
|
||||
/// # use hyper::client::conn::SendRequest;
|
||||
/// # use hyper::Body;
|
||||
/// use hyper::Request;
|
||||
///
|
||||
/// # async fn doc(mut tx: SendRequest<Body>) -> hyper::Result<()> {
|
||||
/// // build a Request
|
||||
/// let req = Request::builder()
|
||||
/// .uri("/foo/bar")
|
||||
/// .header(HOST, "hyper.rs")
|
||||
/// .body(Body::empty())
|
||||
/// .unwrap();
|
||||
///
|
||||
/// // send it and await a Response
|
||||
/// let res = tx.send_request(req).await?;
|
||||
/// // assert the Response
|
||||
/// assert!(res.status().is_success());
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// # fn main() {}
|
||||
/// ```
|
||||
pub fn send_request(&mut self, req: Request<B>) -> impl Future<Output = crate::Result<Response<Body>>> {
|
||||
let sent = self.dispatch.send(req);
|
||||
|
||||
async move {
|
||||
match sent {
|
||||
Ok(rx) => match rx.await {
|
||||
Ok(Ok(resp)) => Ok(resp),
|
||||
Ok(Err(err)) => Err(err),
|
||||
// this is definite bug if it happens, but it shouldn't happen!
|
||||
Err(_canceled) => panic!("dispatch dropped without returning error"),
|
||||
}
|
||||
Err(_req) => {
|
||||
tracing::debug!("connection was not ready");
|
||||
|
||||
Err(crate::Error::new_canceled().with("connection was not ready"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
pub(super) fn send_request_retryable(
|
||||
&mut self,
|
||||
req: Request<B>,
|
||||
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
|
||||
where
|
||||
B: Send,
|
||||
{
|
||||
match self.dispatch.try_send(req) {
|
||||
Ok(rx) => {
|
||||
Either::Left(rx.then(move |res| {
|
||||
match res {
|
||||
Ok(Ok(res)) => future::ok(res),
|
||||
Ok(Err(err)) => future::err(err),
|
||||
// this is definite bug if it happens, but it shouldn't happen!
|
||||
Err(_) => panic!("dispatch dropped without returning error"),
|
||||
}
|
||||
}))
|
||||
}
|
||||
Err(req) => {
|
||||
tracing::debug!("connection was not ready");
|
||||
let err = crate::Error::new_canceled().with("connection was not ready");
|
||||
Either::Right(future::err((err, Some(req))))
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
impl<B> fmt::Debug for SendRequest<B> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("SendRequest").finish()
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Connection
|
||||
|
||||
impl<T, B> fmt::Debug for Connection<T, B>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
|
||||
B: HttpBody + 'static,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Connection").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, B> Future for Connection<T, B>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
B: HttpBody + Send + 'static,
|
||||
B::Data: Send,
|
||||
B::Error: Into<Box<dyn StdError + Send + Sync>>,
|
||||
{
|
||||
type Output = crate::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||
match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
|
||||
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
|
||||
#[cfg(feature = "http1")]
|
||||
proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Builder
|
||||
|
||||
impl Builder {
|
||||
/// Creates a new connection builder.
|
||||
#[inline]
|
||||
pub fn new() -> Builder {
|
||||
Builder {
|
||||
exec: Exec::Default,
|
||||
h2_builder: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Provide an executor to execute background HTTP2 tasks.
|
||||
pub fn executor<E>(&mut self, exec: E) -> &mut Builder
|
||||
where
|
||||
E: Executor<BoxSendFuture> + Send + Sync + 'static,
|
||||
{
|
||||
self.exec = Exec::Executor(Arc::new(exec));
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
|
||||
/// stream-level flow control.
|
||||
///
|
||||
/// Passing `None` will do nothing.
|
||||
///
|
||||
/// If not set, hyper will use a default.
|
||||
///
|
||||
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
|
||||
#[cfg(feature = "http2")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
||||
pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
|
||||
if let Some(sz) = sz.into() {
|
||||
self.h2_builder.adaptive_window = false;
|
||||
self.h2_builder.initial_stream_window_size = sz;
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the max connection-level flow control for HTTP2
|
||||
///
|
||||
/// Passing `None` will do nothing.
|
||||
///
|
||||
/// If not set, hyper will use a default.
|
||||
#[cfg(feature = "http2")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
||||
pub fn http2_initial_connection_window_size(
|
||||
&mut self,
|
||||
sz: impl Into<Option<u32>>,
|
||||
) -> &mut Self {
|
||||
if let Some(sz) = sz.into() {
|
||||
self.h2_builder.adaptive_window = false;
|
||||
self.h2_builder.initial_conn_window_size = sz;
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets whether to use an adaptive flow control.
|
||||
///
|
||||
/// Enabling this will override the limits set in
|
||||
/// `http2_initial_stream_window_size` and
|
||||
/// `http2_initial_connection_window_size`.
|
||||
#[cfg(feature = "http2")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
||||
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
|
||||
use proto::h2::SPEC_WINDOW_SIZE;
|
||||
|
||||
self.h2_builder.adaptive_window = enabled;
|
||||
if enabled {
|
||||
self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
|
||||
self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the maximum frame size to use for HTTP2.
|
||||
///
|
||||
/// Passing `None` will do nothing.
|
||||
///
|
||||
/// If not set, hyper will use a default.
|
||||
#[cfg(feature = "http2")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
||||
pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
|
||||
if let Some(sz) = sz.into() {
|
||||
self.h2_builder.max_frame_size = sz;
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets an interval for HTTP2 Ping frames should be sent to keep a
|
||||
/// connection alive.
|
||||
///
|
||||
/// Pass `None` to disable HTTP2 keep-alive.
|
||||
///
|
||||
/// Default is currently disabled.
|
||||
///
|
||||
/// # Cargo Feature
|
||||
///
|
||||
/// Requires the `runtime` cargo feature to be enabled.
|
||||
#[cfg(feature = "runtime")]
|
||||
#[cfg(feature = "http2")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
||||
pub fn http2_keep_alive_interval(
|
||||
&mut self,
|
||||
interval: impl Into<Option<Duration>>,
|
||||
) -> &mut Self {
|
||||
self.h2_builder.keep_alive_interval = interval.into();
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
|
||||
///
|
||||
/// If the ping is not acknowledged within the timeout, the connection will
|
||||
/// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
|
||||
///
|
||||
/// Default is 20 seconds.
|
||||
///
|
||||
/// # Cargo Feature
|
||||
///
|
||||
/// Requires the `runtime` cargo feature to be enabled.
|
||||
#[cfg(feature = "runtime")]
|
||||
#[cfg(feature = "http2")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
||||
pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
|
||||
self.h2_builder.keep_alive_timeout = timeout;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets whether HTTP2 keep-alive should apply while the connection is idle.
|
||||
///
|
||||
/// If disabled, keep-alive pings are only sent while there are open
|
||||
/// request/responses streams. If enabled, pings are also sent when no
|
||||
/// streams are active. Does nothing if `http2_keep_alive_interval` is
|
||||
/// disabled.
|
||||
///
|
||||
/// Default is `false`.
|
||||
///
|
||||
/// # Cargo Feature
|
||||
///
|
||||
/// Requires the `runtime` cargo feature to be enabled.
|
||||
#[cfg(feature = "runtime")]
|
||||
#[cfg(feature = "http2")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
||||
pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
|
||||
self.h2_builder.keep_alive_while_idle = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the maximum number of HTTP2 concurrent locally reset streams.
|
||||
///
|
||||
/// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
|
||||
/// details.
|
||||
///
|
||||
/// The default value is determined by the `h2` crate.
|
||||
///
|
||||
/// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
|
||||
#[cfg(feature = "http2")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
||||
pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
|
||||
self.h2_builder.max_concurrent_reset_streams = Some(max);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the maximum write buffer size for each HTTP/2 stream.
|
||||
///
|
||||
/// Default is currently 1MB, but may change.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// The value must be no larger than `u32::MAX`.
|
||||
#[cfg(feature = "http2")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
||||
pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
|
||||
assert!(max <= std::u32::MAX as usize);
|
||||
self.h2_builder.max_send_buffer_size = max;
|
||||
self
|
||||
}
|
||||
|
||||
/// Constructs a connection with the configured options and IO.
|
||||
/// See [`client::conn`](crate::client::conn) for more.
|
||||
///
|
||||
/// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
|
||||
/// do nothing.
|
||||
pub fn handshake<T, B>(
|
||||
&self,
|
||||
io: T,
|
||||
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
B: HttpBody + 'static,
|
||||
B::Data: Send,
|
||||
B::Error: Into<Box<dyn StdError + Send + Sync>>,
|
||||
{
|
||||
let opts = self.clone();
|
||||
|
||||
async move {
|
||||
tracing::trace!("client handshake HTTP/1");
|
||||
|
||||
let (tx, rx) = dispatch::channel();
|
||||
let h2 =
|
||||
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec)
|
||||
.await?;
|
||||
Ok((
|
||||
SendRequest { dispatch: tx },
|
||||
Connection { inner: (PhantomData, h2) },
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,6 +84,11 @@ use crate::rt::Executor;
|
||||
use crate::upgrade::Upgraded;
|
||||
use crate::{Body, Request, Response};
|
||||
|
||||
#[cfg(feature = "http1")]
|
||||
pub mod http1;
|
||||
#[cfg(feature = "http2")]
|
||||
pub mod http2;
|
||||
|
||||
#[cfg(feature = "http1")]
|
||||
type Http1Dispatcher<T, B> =
|
||||
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
|
||||
@@ -1,7 +1,6 @@
|
||||
#[cfg(feature = "http2")]
|
||||
use std::future::Future;
|
||||
|
||||
use futures_util::FutureExt;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
#[cfg(feature = "http2")]
|
||||
@@ -169,6 +168,7 @@ impl<T, U> Receiver<T, U> {
|
||||
|
||||
#[cfg(feature = "http1")]
|
||||
pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
|
||||
use futures_util::FutureExt;
|
||||
match self.inner.recv().now_or_never() {
|
||||
Some(Some(mut env)) => env.0.take(),
|
||||
_ => None,
|
||||
|
||||
Reference in New Issue
Block a user