style(lib): run rustfmt and enforce in CI

This commit is contained in:
Sean McArthur
2019-12-05 13:30:53 -08:00
parent b0060f277e
commit 0dc89680cd
69 changed files with 2982 additions and 2499 deletions

View File

@@ -17,19 +17,14 @@ use pin_project::{pin_project, project};
use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::Service;
use crate::body::Payload;
use crate::common::{BoxSendFuture, Exec, Executor, Future, Pin, Poll, task};
use crate::upgrade::Upgraded;
use crate::proto;
use super::dispatch;
use crate::body::Payload;
use crate::common::{task, BoxSendFuture, Exec, Executor, Future, Pin, Poll};
use crate::proto;
use crate::upgrade::Upgraded;
use crate::{Body, Request, Response};
type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
B,
T,
R,
>;
type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, R>;
#[pin_project]
enum ProtoClient<T, B>
@@ -46,18 +41,16 @@ where
const DEFAULT_HTTP2_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
const DEFAULT_HTTP2_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
/// Returns a handshake future over some IO.
///
/// This is a shortcut for `Builder::new().handshake(io)`.
pub async fn handshake<T>(io: T) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
pub async fn handshake<T>(
io: T,
) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
Builder::new()
.handshake(io)
.await
Builder::new().handshake(io).await
}
/// The sender side of an established connection.
@@ -65,7 +58,6 @@ pub struct SendRequest<B> {
dispatch: dispatch::Sender<Request<B>, Response<Body>>,
}
/// A future that processes all HTTP state for the IO object.
///
/// In most cases, this should just be spawned into an executor, so that it
@@ -79,7 +71,6 @@ where
inner: Option<ProtoClient<T, B>>,
}
/// A builder to configure an HTTP connection.
///
/// After setting options, the builder is used to create a handshake future.
@@ -99,7 +90,7 @@ pub struct Builder {
/// Yields a `Response` if successful.
#[must_use = "futures do nothing unless polled"]
pub struct ResponseFuture {
inner: ResponseFutureState
inner: ResponseFutureState,
}
enum ResponseFutureState {
@@ -139,8 +130,7 @@ pub(super) struct Http2SendRequest<B> {
// ===== impl SendRequest
impl<B> SendRequest<B>
{
impl<B> SendRequest<B> {
/// Polls to determine whether this sender can be used yet for a request.
///
/// If the associated connection is closed, this returns an Error.
@@ -148,7 +138,7 @@ impl<B> SendRequest<B>
self.dispatch.poll_ready(cx)
}
pub(super) fn when_ready(self) -> impl Future<Output=crate::Result<Self>> {
pub(super) fn when_ready(self) -> impl Future<Output = crate::Result<Self>> {
let mut me = Some(self);
future::poll_fn(move |cx| {
ready!(me.as_mut().unwrap().poll_ready(cx))?;
@@ -218,9 +208,7 @@ where
/// ```
pub fn send_request(&mut self, req: Request<B>) -> ResponseFuture {
let inner = match self.dispatch.send(req) {
Ok(rx) => {
ResponseFutureState::Waiting(rx)
},
Ok(rx) => ResponseFutureState::Waiting(rx),
Err(_req) => {
debug!("connection was not ready");
let err = crate::Error::new_canceled().with("connection was not ready");
@@ -228,12 +216,13 @@ where
}
};
ResponseFuture {
inner,
}
ResponseFuture { inner }
}
pub(crate) fn send_request_retryable(&mut self, req: Request<B>) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
pub(crate) fn send_request_retryable(
&mut self,
req: Request<B>,
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
where
B: Send,
{
@@ -247,7 +236,7 @@ where
Err(_) => panic!("dispatch dropped without returning error"),
}
}))
},
}
Err(req) => {
debug!("connection was not ready");
let err = crate::Error::new_canceled().with("connection was not ready");
@@ -259,7 +248,8 @@ where
impl<B> Service<Request<B>> for SendRequest<B>
where
B: Payload + 'static, {
B: Payload + 'static,
{
type Response = Response<Body>;
type Error = crate::Error;
type Future = ResponseFuture;
@@ -275,8 +265,7 @@ where
impl<B> fmt::Debug for SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendRequest")
.finish()
f.debug_struct("SendRequest").finish()
}
}
@@ -296,7 +285,10 @@ impl<B> Http2SendRequest<B>
where
B: Payload + 'static,
{
pub(super) fn send_request_retryable(&mut self, req: Request<B>) -> impl Future<Output=Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
pub(super) fn send_request_retryable(
&mut self,
req: Request<B>,
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
where
B: Send,
{
@@ -310,7 +302,7 @@ where
Err(_) => panic!("dispatch dropped without returning error"),
}
}))
},
}
Err(req) => {
debug!("connection was not ready");
let err = crate::Error::new_canceled().with("connection was not ready");
@@ -322,8 +314,7 @@ where
impl<B> fmt::Debug for Http2SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Http2SendRequest")
.finish()
f.debug_struct("Http2SendRequest").finish()
}
}
@@ -374,18 +365,14 @@ where
/// to work with this function; or use the `without_shutdown` wrapper.
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
match self.inner.as_mut().expect("already upgraded") {
&mut ProtoClient::H1(ref mut h1) => {
h1.poll_without_shutdown(cx)
},
&mut ProtoClient::H2(ref mut h2) => {
Pin::new(h2).poll(cx).map_ok(|_| ())
}
&mut ProtoClient::H1(ref mut h1) => h1.poll_without_shutdown(cx),
&mut ProtoClient::H2(ref mut h2) => Pin::new(h2).poll(cx).map_ok(|_| ()),
}
}
/// Prevent shutdown of the underlying IO object at the end of service the request,
/// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
pub fn without_shutdown(self) -> impl Future<Output=crate::Result<Parts<T>>> {
pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>> {
let mut conn = Some(self);
future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
@@ -404,9 +391,7 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
proto::Dispatched::Shutdown => {
Poll::Ready(Ok(()))
},
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
proto::Dispatched::Upgrade(pending) => {
let h1 = match mem::replace(&mut self.inner, None) {
Some(ProtoClient::H1(h1)) => h1,
@@ -427,8 +412,7 @@ where
B: Payload + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection")
.finish()
f.debug_struct("Connection").finish()
}
}
@@ -519,7 +503,10 @@ impl Builder {
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
pub fn http2_initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
pub fn http2_initial_connection_window_size(
&mut self,
sz: impl Into<Option<u32>>,
) -> &mut Self {
if let Some(sz) = sz.into() {
self.h2_builder.initial_connection_window_size(sz);
}
@@ -527,7 +514,10 @@ impl Builder {
}
/// Constructs a connection with the configured options and IO.
pub fn handshake<T, B>(&self, io: T) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
pub fn handshake<T, B>(
&self,
io: T,
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Payload + 'static,
@@ -563,12 +553,8 @@ impl Builder {
};
Ok((
SendRequest {
dispatch: tx,
},
Connection {
inner: Some(proto),
},
SendRequest { dispatch: tx },
Connection { inner: Some(proto) },
))
}
}
@@ -588,7 +574,7 @@ impl Future for ResponseFuture {
// this is definite bug if it happens, but it shouldn't happen!
Err(_canceled) => panic!("dispatch dropped without returning error"),
})
},
}
ResponseFutureState::Error(ref mut err) => {
Poll::Ready(Err(err.take().expect("polled after ready")))
}
@@ -598,8 +584,7 @@ impl Future for ResponseFuture {
impl fmt::Debug for ResponseFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ResponseFuture")
.finish()
f.debug_struct("ResponseFuture").finish()
}
}
@@ -628,7 +613,6 @@ where
trait AssertSend: Send {}
trait AssertSendSync: Send + Sync {}
#[doc(hidden)]
impl<B: Send> AssertSendSync for SendRequest<B> {}
@@ -637,7 +621,8 @@ impl<T: Send, B: Send> AssertSend for Connection<T, B>
where
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
{}
{
}
#[doc(hidden)]
impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
@@ -645,11 +630,11 @@ where
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
B::Data: Send + Sync + 'static,
{}
{
}
#[doc(hidden)]
impl AssertSendSync for Builder {}
#[doc(hidden)]
impl AssertSend for ResponseFuture {}

View File

@@ -22,11 +22,11 @@
//! });
//! ```
use std::error::Error;
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs};
use std::pin::Pin;
use std::str::FromStr;
use std::task::{self, Poll};
use std::pin::Pin;
use std::future::Future;
use std::{fmt, io, vec};
use tokio::task::JoinHandle;
@@ -394,7 +394,8 @@ mod tests {
let dst = ::http::Uri::from_static("http://[::1]:8080/");
let mut addrs =
IpAddrs::try_parse(dst.host().expect("host"), dst.port_u16().expect("port")).expect("try_parse");
IpAddrs::try_parse(dst.host().expect("host"), dst.port_u16().expect("port"))
.expect("try_parse");
let expected = "[::1]:8080".parse::<SocketAddr>().expect("expected");

View File

@@ -1,12 +1,12 @@
use std::error::Error as StdError;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{self, Poll};
use std::fmt;
use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::net::{IpAddr, SocketAddr};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{self, Poll};
use std::time::Duration;
use futures_util::future::Either;
@@ -20,7 +20,6 @@ use super::dns::{self, resolve, GaiResolver, Resolve};
use super::{Connected, Connection};
//#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver;
/// A connector for the `http` scheme.
///
/// Performs DNS resolution in a thread pool, and then connects over TCP.
@@ -256,10 +255,7 @@ impl<R> HttpConnector<R>
where
R: Resolve,
{
async fn call_async(
&mut self,
dst: Uri,
) -> Result<TcpStream, ConnectError> {
async fn call_async(&mut self, dst: Uri) -> Result<TcpStream, ConnectError> {
trace!(
"Http::connect; scheme={:?}, host={:?}, port={:?}",
dst.scheme(),
@@ -292,7 +288,13 @@ where
};
let port = match dst.port() {
Some(port) => port.as_u16(),
None => if dst.scheme() == Some(&Scheme::HTTPS) { 443 } else { 80 },
None => {
if dst.scheme() == Some(&Scheme::HTTPS) {
443
} else {
80
}
}
};
let config = &self.config;
@@ -348,9 +350,7 @@ impl Connection for TcpStream {
fn connected(&self) -> Connected {
let connected = Connected::new();
if let Ok(remote_addr) = self.peer_addr() {
connected.extra(HttpInfo {
remote_addr,
})
connected.extra(HttpInfo { remote_addr })
} else {
connected
}
@@ -389,7 +389,6 @@ impl<R: Resolve> Future for HttpConnecting<R> {
}
}
// Not publicly exported (so missing_docs doesn't trigger).
pub struct ConnectError {
msg: Box<str>,
@@ -531,14 +530,7 @@ impl ConnectingTcpRemote {
let mut err = None;
for addr in &mut self.addrs {
debug!("connecting to {}", addr);
match connect(
&addr,
local_addr,
reuse_address,
self.connect_timeout,
)?
.await
{
match connect(&addr, local_addr, reuse_address, self.connect_timeout)?.await {
Ok(tcp) => {
debug!("connected to {:?}", tcp.peer_addr().ok());
return Ok(tcp);
@@ -606,13 +598,9 @@ impl ConnectingTcp {
..
} = self;
match self.fallback {
None => {
self.preferred
.connect(local_addr, reuse_address)
.await
}
None => self.preferred.connect(local_addr, reuse_address).await,
Some(mut fallback) => {
let preferred_fut = self.preferred.connect(local_addr, reuse_address);
let preferred_fut = self.preferred.connect(local_addr, reuse_address);
futures_util::pin_mut!(preferred_fut);
let fallback_fut = fallback.remote.connect(local_addr, reuse_address);
@@ -652,10 +640,7 @@ mod tests {
use super::super::sealed::Connect;
use super::HttpConnector;
async fn connect<C>(
connector: C,
dst: Uri,
) -> Result<C::Transport, C::Error>
async fn connect<C>(connector: C, dst: Uri) -> Result<C::Transport, C::Error>
where
C: Connect,
{
@@ -795,7 +780,6 @@ mod tests {
continue;
}
let (start, stream) = rt
.block_on(async move {
let addrs = hosts

View File

@@ -7,11 +7,14 @@
//! - Types to build custom connectors.
use std::fmt;
use ::http::{Response};
use ::http::Response;
#[cfg(feature = "tcp")] pub mod dns;
#[cfg(feature = "tcp")] mod http;
#[cfg(feature = "tcp")] pub use self::http::{HttpConnector, HttpInfo};
#[cfg(feature = "tcp")]
pub mod dns;
#[cfg(feature = "tcp")]
mod http;
#[cfg(feature = "tcp")]
pub use self::http::{HttpConnector, HttpInfo};
/// Describes a type returned by a connector.
pub trait Connection {
@@ -115,8 +118,7 @@ impl Clone for Extra {
impl fmt::Debug for Extra {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Extra")
.finish()
f.debug_struct("Extra").finish()
}
}
@@ -133,7 +135,7 @@ struct ExtraEnvelope<T>(T);
impl<T> ExtraInner for ExtraEnvelope<T>
where
T: Clone + Send + Sync + 'static
T: Clone + Send + Sync + 'static,
{
fn clone_box(&self) -> Box<dyn ExtraInner> {
Box::new(self.clone())
@@ -154,7 +156,7 @@ impl<T: Clone> Clone for ExtraChain<T> {
impl<T> ExtraInner for ExtraChain<T>
where
T: Clone + Send + Sync + 'static
T: Clone + Send + Sync + 'static,
{
fn clone_box(&self) -> Box<dyn ExtraInner> {
Box::new(self.clone())
@@ -172,8 +174,8 @@ pub(super) mod sealed {
use ::http::Uri;
use tokio::io::{AsyncRead, AsyncWrite};
use super::Connection;
use crate::common::{Future, Unpin};
use super::{Connection};
/// Connect to a destination, returning an IO transport.
///
@@ -193,14 +195,14 @@ pub(super) mod sealed {
/// An error occured when trying to connect.
type Error: Into<Box<dyn StdError + Send + Sync>>;
/// A Future that will resolve to the connected Transport.
type Future: Future<Output=Result<Self::Transport, Self::Error>>;
type Future: Future<Output = Result<Self::Transport, Self::Error>>;
#[doc(hidden)]
fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future;
}
impl<S, T> Connect for S
where
S: tower_service::Service<Uri, Response=T> + Send,
S: tower_service::Service<Uri, Response = T> + Send,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
@@ -215,11 +217,12 @@ pub(super) mod sealed {
impl<S, T> Sealed for S
where
S: tower_service::Service<Uri, Response=T> + Send,
S: tower_service::Service<Uri, Response = T> + Send,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{}
{
}
pub trait Sealed {}
#[allow(missing_debug_implementations)]
@@ -228,7 +231,7 @@ pub(super) mod sealed {
#[cfg(test)]
mod tests {
use super::{Connected};
use super::Connected;
#[derive(Clone, Debug, PartialEq)]
struct Ex1(usize);
@@ -241,18 +244,13 @@ mod tests {
#[test]
fn test_connected_extra() {
let c1 = Connected::new()
.extra(Ex1(41));
let c1 = Connected::new().extra(Ex1(41));
let mut res1 = crate::Response::new(crate::Body::empty());
assert_eq!(res1.extensions().get::<Ex1>(), None);
c1
.extra
.as_ref()
.expect("c1 extra")
.set(&mut res1);
c1.extra.as_ref().expect("c1 extra").set(&mut res1);
assert_eq!(res1.extensions().get::<Ex1>(), Some(&Ex1(41)));
}
@@ -273,11 +271,7 @@ mod tests {
assert_eq!(res1.extensions().get::<Ex2>(), None);
assert_eq!(res1.extensions().get::<Ex3>(), None);
c1
.extra
.as_ref()
.expect("c1 extra")
.set(&mut res1);
c1.extra.as_ref().expect("c1 extra").set(&mut res1);
assert_eq!(res1.extensions().get::<Ex1>(), Some(&Ex1(45)));
assert_eq!(res1.extensions().get::<Ex2>(), Some(&Ex2("zoom")));
@@ -291,11 +285,7 @@ mod tests {
let mut res2 = crate::Response::new(crate::Body::empty());
c2
.extra
.as_ref()
.expect("c2 extra")
.set(&mut res2);
c2.extra.as_ref().expect("c2 extra").set(&mut res2);
assert_eq!(res2.extensions().get::<Ex1>(), Some(&Ex1(99)));
assert_eq!(res2.extensions().get::<Ex2>(), Some(&Ex2("hiccup")));

View File

@@ -1,8 +1,8 @@
use futures_core::Stream;
use futures_channel::{mpsc, oneshot};
use futures_core::Stream;
use futures_util::future;
use crate::common::{Future, Pin, Poll, task};
use crate::common::{task, Future, Pin, Poll};
pub type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
pub type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;
@@ -52,7 +52,8 @@ pub struct UnboundedSender<T, U> {
impl<T, U> Sender<T, U> {
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
self.giver.poll_want(cx)
self.giver
.poll_want(cx)
.map_err(|_| crate::Error::new_closed())
}
@@ -82,7 +83,8 @@ impl<T, U> Sender<T, U> {
return Err(val);
}
let (tx, rx) = oneshot::channel();
self.inner.unbounded_send(Envelope(Some((val, Callback::Retry(tx)))))
self.inner
.unbounded_send(Envelope(Some((val, Callback::Retry(tx)))))
.map(move |_| rx)
.map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0)
}
@@ -92,7 +94,8 @@ impl<T, U> Sender<T, U> {
return Err(val);
}
let (tx, rx) = oneshot::channel();
self.inner.unbounded_send(Envelope(Some((val, Callback::NoRetry(tx)))))
self.inner
.unbounded_send(Envelope(Some((val, Callback::NoRetry(tx)))))
.map(move |_| rx)
.map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0)
}
@@ -116,7 +119,8 @@ impl<T, U> UnboundedSender<T, U> {
pub fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner.unbounded_send(Envelope(Some((val, Callback::Retry(tx)))))
self.inner
.unbounded_send(Envelope(Some((val, Callback::Retry(tx)))))
.map(move |_| rx)
.map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0)
}
@@ -137,15 +141,18 @@ pub struct Receiver<T, U> {
}
impl<T, U> Receiver<T, U> {
pub(crate) fn poll_next(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> {
pub(crate) fn poll_next(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<(T, Callback<T, U>)>> {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(item) => Poll::Ready(item.map(|mut env| {
env.0.take().expect("envelope not dropped")
})),
Poll::Ready(item) => {
Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
}
Poll::Pending => {
self.taker.want();
Poll::Pending
},
}
}
}
@@ -176,7 +183,10 @@ struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
impl<T, U> Drop for Envelope<T, U> {
fn drop(&mut self) {
if let Some((val, cb)) = self.0.take() {
let _ = cb.send(Err((crate::Error::new_canceled().with("connection closed"), Some(val))));
let _ = cb.send(Err((
crate::Error::new_canceled().with("connection closed"),
Some(val),
)));
}
}
}
@@ -205,7 +215,7 @@ impl<T, U> Callback<T, U> {
match self {
Callback::Retry(tx) => {
let _ = tx.send(val);
},
}
Callback::NoRetry(tx) => {
let _ = tx.send(val.map_err(|e| e.0));
}
@@ -214,29 +224,25 @@ impl<T, U> Callback<T, U> {
pub(crate) fn send_when(
self,
mut when: impl Future<Output=Result<U, (crate::Error, Option<T>)>> + Unpin,
) -> impl Future<Output=()> {
mut when: impl Future<Output = Result<U, (crate::Error, Option<T>)>> + Unpin,
) -> impl Future<Output = ()> {
let mut cb = Some(self);
// "select" on this callback being canceled, and the future completing
future::poll_fn(move |cx| {
match Pin::new(&mut when).poll(cx) {
Poll::Ready(Ok(res)) => {
cb.take()
.expect("polled after complete")
.send(Ok(res));
cb.take().expect("polled after complete").send(Ok(res));
Poll::Ready(())
},
}
Poll::Pending => {
// check if the callback is canceled
ready!(cb.as_mut().unwrap().poll_canceled(cx));
trace!("send_when canceled");
Poll::Ready(())
},
}
Poll::Ready(Err(err)) => {
cb.take()
.expect("polled after complete")
.send(Err(err));
cb.take().expect("polled after complete").send(Err(err));
Poll::Ready(())
}
}
@@ -253,7 +259,7 @@ mod tests {
use std::pin::Pin;
use std::task::{Context, Poll};
use super::{Callback, channel, Receiver};
use super::{channel, Callback, Receiver};
#[derive(Debug)]
struct Custom(i32);
@@ -271,14 +277,14 @@ mod tests {
impl<F, T> Future for PollOnce<'_, F>
where
F: Future<Output = T> + Unpin
F: Future<Output = T> + Unpin,
{
type Output = Option<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Ready(_) => Poll::Ready(Some(())),
Poll::Pending => Poll::Ready(None)
Poll::Pending => Poll::Ready(None),
}
}
}
@@ -357,7 +363,7 @@ mod tests {
let poll_once = PollOnce(&mut rx);
let opt = poll_once.await;
if opt.is_none() {
break
break;
}
}
});

View File

@@ -64,17 +64,18 @@ use std::sync::Arc;
use std::time::Duration;
use futures_channel::oneshot;
use futures_util::future::{self, FutureExt as _, TryFutureExt as _, Either};
use http::{Method, Request, Response, Uri, Version};
use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
use http::header::{HeaderValue, HOST};
use http::uri::Scheme;
use http::{Method, Request, Response, Uri, Version};
use crate::body::{Body, Payload};
use crate::common::{lazy as hyper_lazy, BoxSendFuture, Executor, Lazy, Future, Pin, Poll, task};
use self::connect::{Alpn, sealed::Connect, Connected, Connection};
use self::connect::{sealed::Connect, Alpn, Connected, Connection};
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};
use crate::body::{Body, Payload};
use crate::common::{lazy as hyper_lazy, task, BoxSendFuture, Executor, Future, Lazy, Pin, Poll};
#[cfg(feature = "tcp")] pub use self::connect::HttpConnector;
#[cfg(feature = "tcp")]
pub use self::connect::HttpConnector;
pub mod conn;
pub mod connect;
@@ -104,7 +105,7 @@ struct Config {
/// This is returned by `Client::request` (and `Client::get`).
#[must_use = "futures do nothing unless polled"]
pub struct ResponseFuture {
inner: Pin<Box<dyn Future<Output=crate::Result<Response<Body>>> + Send>>,
inner: Pin<Box<dyn Future<Output = crate::Result<Response<Body>>> + Send>>,
}
// ===== impl Client =====
@@ -157,11 +158,12 @@ impl Client<(), Body> {
}
impl<C, B> Client<C, B>
where C: Connect + Clone + Send + Sync + 'static,
C::Transport: Unpin + Send + 'static,
C::Future: Unpin + Send + 'static,
B: Payload + Unpin + Send + 'static,
B::Data: Send + Unpin,
where
C: Connect + Clone + Send + Sync + 'static,
C::Transport: Unpin + Send + 'static,
C::Future: Unpin + Send + 'static,
B: Payload + Unpin + Send + 'static,
B::Data: Send + Unpin,
{
/// Send a `GET` request to the supplied `Uri`.
///
@@ -223,13 +225,19 @@ where C: Connect + Clone + Send + Sync + 'static,
let is_http_connect = req.method() == &Method::CONNECT;
match req.version() {
Version::HTTP_11 => (),
Version::HTTP_10 => if is_http_connect {
warn!("CONNECT is not allowed for HTTP/1.0");
return ResponseFuture::new(Box::new(future::err(crate::Error::new_user_unsupported_request_method())));
},
other_h2 @ Version::HTTP_2 => if self.config.ver != Ver::Http2 {
return ResponseFuture::error_version(other_h2);
},
Version::HTTP_10 => {
if is_http_connect {
warn!("CONNECT is not allowed for HTTP/1.0");
return ResponseFuture::new(Box::new(future::err(
crate::Error::new_user_unsupported_request_method(),
)));
}
}
other_h2 @ Version::HTTP_2 => {
if self.config.ver != Ver::Http2 {
return ResponseFuture::error_version(other_h2);
}
}
// completely unsupported HTTP version (like HTTP/0.9)!
other => return ResponseFuture::error_version(other),
};
@@ -245,7 +253,11 @@ where C: Connect + Clone + Send + Sync + 'static,
ResponseFuture::new(Box::new(self.retryably_send_request(req, pool_key)))
}
fn retryably_send_request(&self, req: Request<B>, pool_key: PoolKey) -> impl Future<Output=crate::Result<Response<Body>>> {
fn retryably_send_request(
&self,
req: Request<B>,
pool_key: PoolKey,
) -> impl Future<Output = crate::Result<Response<Body>>> {
let client = self.clone();
let uri = req.uri().clone();
@@ -265,7 +277,10 @@ where C: Connect + Clone + Send + Sync + 'static,
return Poll::Ready(Err(reason));
}
trace!("unstarted request canceled, trying again (reason={:?})", reason);
trace!(
"unstarted request canceled, trying again (reason={:?})",
reason
);
*req.uri_mut() = uri.clone();
send_fut = client.send_request(req, pool_key.clone());
}
@@ -273,7 +288,11 @@ where C: Connect + Clone + Send + Sync + 'static,
})
}
fn send_request(&self, mut req: Request<B>, pool_key: PoolKey) -> impl Future<Output=Result<Response<Body>, ClientError<B>>> + Unpin {
fn send_request(
&self,
mut req: Request<B>,
pool_key: PoolKey,
) -> impl Future<Output = Result<Response<Body>, ClientError<B>>> + Unpin {
let conn = self.connection_for(req.uri().clone(), pool_key);
let set_host = self.config.set_host;
@@ -282,18 +301,16 @@ where C: Connect + Clone + Send + Sync + 'static,
if pooled.is_http1() {
if set_host {
let uri = req.uri().clone();
req
.headers_mut()
.entry(HOST)
.or_insert_with(|| {
let hostname = uri.host().expect("authority implies host");
if let Some(port) = uri.port() {
let s = format!("{}:{}", hostname, port);
HeaderValue::from_str(&s)
} else {
HeaderValue::from_str(hostname)
}.expect("uri host is valid header value")
});
req.headers_mut().entry(HOST).or_insert_with(|| {
let hostname = uri.host().expect("authority implies host");
if let Some(port) = uri.port() {
let s = format!("{}:{}", hostname, port);
HeaderValue::from_str(&s)
} else {
HeaderValue::from_str(hostname)
}
.expect("uri host is valid header value")
});
}
// CONNECT always sends authority-form, so check it first...
@@ -306,10 +323,13 @@ where C: Connect + Clone + Send + Sync + 'static,
};
} else if req.method() == &Method::CONNECT {
debug!("client does not support CONNECT requests over HTTP2");
return Either::Left(future::err(ClientError::Normal(crate::Error::new_user_unsupported_request_method())));
return Either::Left(future::err(ClientError::Normal(
crate::Error::new_user_unsupported_request_method(),
)));
}
let fut = pooled.send_request_retryable(req)
let fut = pooled
.send_request_retryable(req)
.map_err(ClientError::map_with_reused(pooled.is_reused()));
// If the Connector included 'extra' info, add to Response...
@@ -332,51 +352,46 @@ where C: Connect + Clone + Send + Sync + 'static,
return Either::Right(Either::Left(fut));
}
Either::Right(Either::Right(fut
.map_ok(move |mut res| {
// If pooled is HTTP/2, we can toss this reference immediately.
//
// when pooled is dropped, it will try to insert back into the
// pool. To delay that, spawn a future that completes once the
// sender is ready again.
//
// This *should* only be once the related `Connection` has polled
// for a new request to start.
//
// It won't be ready if there is a body to stream.
if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
drop(pooled);
} else if !res.body().is_end_stream() {
let (delayed_tx, delayed_rx) = oneshot::channel();
res.body_mut().delayed_eof(delayed_rx);
let on_idle = future::poll_fn(move |cx| {
pooled.poll_ready(cx)
})
.map(move |_| {
// At this point, `pooled` is dropped, and had a chance
// to insert into the pool (if conn was idle)
drop(delayed_tx);
});
Either::Right(Either::Right(fut.map_ok(move |mut res| {
// If pooled is HTTP/2, we can toss this reference immediately.
//
// when pooled is dropped, it will try to insert back into the
// pool. To delay that, spawn a future that completes once the
// sender is ready again.
//
// This *should* only be once the related `Connection` has polled
// for a new request to start.
//
// It won't be ready if there is a body to stream.
if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
drop(pooled);
} else if !res.body().is_end_stream() {
let (delayed_tx, delayed_rx) = oneshot::channel();
res.body_mut().delayed_eof(delayed_rx);
let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
// At this point, `pooled` is dropped, and had a chance
// to insert into the pool (if conn was idle)
drop(delayed_tx);
});
executor.execute(on_idle);
} else {
// There's no body to delay, but the connection isn't
// ready yet. Only re-insert when it's ready
let on_idle = future::poll_fn(move |cx| {
pooled.poll_ready(cx)
})
.map(|_| ());
executor.execute(on_idle);
} else {
// There's no body to delay, but the connection isn't
// ready yet. Only re-insert when it's ready
let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
executor.execute(on_idle);
}
res
})))
executor.execute(on_idle);
}
res
})))
})
}
fn connection_for(&self, uri: Uri, pool_key: PoolKey)
-> impl Future<Output=Result<Pooled<PoolClient<B>>, ClientError<B>>>
{
fn connection_for(
&self,
uri: Uri,
pool_key: PoolKey,
) -> impl Future<Output = Result<Pooled<PoolClient<B>>, ClientError<B>>> {
// This actually races 2 different futures to try to get a ready
// connection the fastest, and to reduce connection churn.
//
@@ -395,67 +410,66 @@ where C: Connect + Clone + Send + Sync + 'static,
let executor = self.conn_builder.exec.clone();
// The order of the `select` is depended on below...
future::select(checkout, connect)
.then(move |either| match either {
// Checkout won, connect future may have been started or not.
future::select(checkout, connect).then(move |either| match either {
// Checkout won, connect future may have been started or not.
//
// If it has, let it finish and insert back into the pool,
// so as to not waste the socket...
Either::Left((Ok(checked_out), connecting)) => {
// This depends on the `select` above having the correct
// order, such that if the checkout future were ready
// immediately, the connect future will never have been
// started.
//
// If it has, let it finish and insert back into the pool,
// so as to not waste the socket...
Either::Left((Ok(checked_out), connecting)) => {
// This depends on the `select` above having the correct
// order, such that if the checkout future were ready
// immediately, the connect future will never have been
// started.
//
// If it *wasn't* ready yet, then the connect future will
// have been started...
if connecting.started() {
let bg = connecting
.map_err(|err| {
trace!("background connect error: {}", err);
})
.map(|_pooled| {
// dropping here should just place it in
// the Pool for us...
});
// An execute error here isn't important, we're just trying
// to prevent a waste of a socket...
let _ = executor.execute(bg);
}
Either::Left(future::ok(checked_out))
},
// Connect won, checkout can just be dropped.
Either::Right((Ok(connected), _checkout)) => {
Either::Left(future::ok(connected))
},
// Either checkout or connect could get canceled:
//
// 1. Connect is canceled if this is HTTP/2 and there is
// an outstanding HTTP/2 connecting task.
// 2. Checkout is canceled if the pool cannot deliver an
// idle connection reliably.
//
// In both cases, we should just wait for the other future.
Either::Left((Err(err), connecting)) => Either::Right(Either::Left({
if err.is_canceled() {
Either::Left(connecting.map_err(ClientError::Normal))
} else {
Either::Right(future::err(ClientError::Normal(err)))
}
})),
Either::Right((Err(err), checkout)) => Either::Right(Either::Right({
if err.is_canceled() {
Either::Left(checkout.map_err(ClientError::Normal))
} else {
Either::Right(future::err(ClientError::Normal(err)))
}
})),
})
// If it *wasn't* ready yet, then the connect future will
// have been started...
if connecting.started() {
let bg = connecting
.map_err(|err| {
trace!("background connect error: {}", err);
})
.map(|_pooled| {
// dropping here should just place it in
// the Pool for us...
});
// An execute error here isn't important, we're just trying
// to prevent a waste of a socket...
let _ = executor.execute(bg);
}
Either::Left(future::ok(checked_out))
}
// Connect won, checkout can just be dropped.
Either::Right((Ok(connected), _checkout)) => Either::Left(future::ok(connected)),
// Either checkout or connect could get canceled:
//
// 1. Connect is canceled if this is HTTP/2 and there is
// an outstanding HTTP/2 connecting task.
// 2. Checkout is canceled if the pool cannot deliver an
// idle connection reliably.
//
// In both cases, we should just wait for the other future.
Either::Left((Err(err), connecting)) => Either::Right(Either::Left({
if err.is_canceled() {
Either::Left(connecting.map_err(ClientError::Normal))
} else {
Either::Right(future::err(ClientError::Normal(err)))
}
})),
Either::Right((Err(err), checkout)) => Either::Right(Either::Right({
if err.is_canceled() {
Either::Left(checkout.map_err(ClientError::Normal))
} else {
Either::Right(future::err(ClientError::Normal(err)))
}
})),
})
}
fn connect_to(&self, uri: Uri, pool_key: PoolKey)
-> impl Lazy<Output=crate::Result<Pooled<PoolClient<B>>>> + Unpin
{
fn connect_to(
&self,
uri: Uri,
pool_key: PoolKey,
) -> impl Lazy<Output = crate::Result<Pooled<PoolClient<B>>>> + Unpin {
let executor = self.conn_builder.exec.clone();
let pool = self.pool.clone();
let mut conn_builder = self.conn_builder.clone();
@@ -472,68 +486,82 @@ where C: Connect + Clone + Send + Sync + 'static,
let connecting = match pool.connecting(&pool_key, ver) {
Some(lock) => lock,
None => {
let canceled = crate::Error::new_canceled().with("HTTP/2 connection in progress");
let canceled =
crate::Error::new_canceled().with("HTTP/2 connection in progress");
return Either::Right(future::err(canceled));
}
};
Either::Left(connector.connect(connect::sealed::Internal, dst)
.map_err(crate::Error::new_connect)
.and_then(move |io| {
let connected = io.connected();
// If ALPN is h2 and we aren't http2_only already,
// then we need to convert our pool checkout into
// a single HTTP2 one.
let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
match connecting.alpn_h2(&pool) {
Some(lock) => {
trace!("ALPN negotiated h2, updating pool");
lock
},
None => {
// Another connection has already upgraded,
// the pool checkout should finish up for us.
let canceled = crate::Error::new_canceled().with("ALPN upgraded to HTTP/2");
return Either::Right(future::err(canceled));
Either::Left(
connector
.connect(connect::sealed::Internal, dst)
.map_err(crate::Error::new_connect)
.and_then(move |io| {
let connected = io.connected();
// If ALPN is h2 and we aren't http2_only already,
// then we need to convert our pool checkout into
// a single HTTP2 one.
let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
match connecting.alpn_h2(&pool) {
Some(lock) => {
trace!("ALPN negotiated h2, updating pool");
lock
}
None => {
// Another connection has already upgraded,
// the pool checkout should finish up for us.
let canceled = crate::Error::new_canceled()
.with("ALPN upgraded to HTTP/2");
return Either::Right(future::err(canceled));
}
}
}
} else {
connecting
};
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
Either::Left(Box::pin(conn_builder
.http2_only(is_h2)
.handshake(io)
.and_then(move |(tx, conn)| {
trace!("handshake complete, spawning background dispatcher task");
executor.execute(conn.map_err(|e| {
debug!("client connection error: {}", e)
}).map(|_| ()));
} else {
connecting
};
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
Either::Left(Box::pin(
conn_builder
.http2_only(is_h2)
.handshake(io)
.and_then(move |(tx, conn)| {
trace!(
"handshake complete, spawning background dispatcher task"
);
executor.execute(
conn.map_err(|e| debug!("client connection error: {}", e))
.map(|_| ()),
);
// Wait for 'conn' to ready up before we
// declare this tx as usable
tx.when_ready()
})
.map_ok(move |tx| {
pool.pooled(connecting, PoolClient {
conn_info: connected,
tx: if is_h2 {
PoolTx::Http2(tx.into_http2())
} else {
PoolTx::Http1(tx)
},
})
})))
}))
// Wait for 'conn' to ready up before we
// declare this tx as usable
tx.when_ready()
})
.map_ok(move |tx| {
pool.pooled(
connecting,
PoolClient {
conn_info: connected,
tx: if is_h2 {
PoolTx::Http2(tx.into_http2())
} else {
PoolTx::Http1(tx)
},
},
)
}),
))
}),
)
})
}
}
impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
where C: Connect + Clone + Send + Sync + 'static,
C::Transport: Unpin + Send + 'static,
C::Future: Unpin + Send + 'static,
B: Payload + Unpin + Send + 'static,
B::Data: Send + Unpin,
where
C: Connect + Clone + Send + Sync + 'static,
C::Transport: Unpin + Send + 'static,
C::Future: Unpin + Send + 'static,
B: Payload + Unpin + Send + 'static,
B::Data: Send + Unpin,
{
type Response = Response<Body>;
type Error = crate::Error;
@@ -561,23 +589,22 @@ impl<C: Clone, B> Clone for Client<C, B> {
impl<C, B> fmt::Debug for Client<C, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.finish()
f.debug_struct("Client").finish()
}
}
// ===== impl ResponseFuture =====
impl ResponseFuture {
fn new(fut: Box<dyn Future<Output=crate::Result<Response<Body>>> + Send>) -> Self {
Self {
inner: fut.into(),
}
fn new(fut: Box<dyn Future<Output = crate::Result<Response<Body>>> + Send>) -> Self {
Self { inner: fut.into() }
}
fn error_version(ver: Version) -> Self {
warn!("Request has unsupported version \"{:?}\"", ver);
ResponseFuture::new(Box::new(future::err(crate::Error::new_user_unsupported_version())))
ResponseFuture::new(Box::new(future::err(
crate::Error::new_user_unsupported_version(),
)))
}
}
@@ -644,7 +671,10 @@ impl<B> PoolClient<B> {
}
impl<B: Payload + 'static> PoolClient<B> {
fn send_request_retryable(&mut self, req: Request<B>) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
fn send_request_retryable(
&mut self,
req: Request<B>,
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
where
B: Send,
{
@@ -668,12 +698,10 @@ where
fn reserve(self) -> Reservation<Self> {
match self.tx {
PoolTx::Http1(tx) => {
Reservation::Unique(PoolClient {
conn_info: self.conn_info,
tx: PoolTx::Http1(tx),
})
},
PoolTx::Http1(tx) => Reservation::Unique(PoolClient {
conn_info: self.conn_info,
tx: PoolTx::Http1(tx),
}),
PoolTx::Http2(tx) => {
let b = PoolClient {
conn_info: self.conn_info.clone(),
@@ -703,13 +731,11 @@ enum ClientError<B> {
connection_reused: bool,
req: Request<B>,
reason: crate::Error,
}
},
}
impl<B> ClientError<B> {
fn map_with_reused(conn_reused: bool)
-> impl Fn((crate::Error, Option<Request<B>>)) -> Self
{
fn map_with_reused(conn_reused: bool) -> impl Fn((crate::Error, Option<Request<B>>)) -> Self {
move |(err, orig_req)| {
if let Some(req) = orig_req {
ClientError::Canceled {
@@ -737,7 +763,7 @@ fn origin_form(uri: &mut Uri) {
let mut parts = ::http::uri::Parts::default();
parts.path_and_query = Some(path.clone());
Uri::from_parts(parts).expect("path is valid uri")
},
}
_none_or_just_slash => {
debug_assert!(Uri::default() == "/");
Uri::default()
@@ -748,7 +774,10 @@ fn origin_form(uri: &mut Uri) {
fn absolute_form(uri: &mut Uri) {
debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
debug_assert!(uri.authority().is_some(), "absolute_form needs an authority");
debug_assert!(
uri.authority().is_some(),
"absolute_form needs an authority"
);
// If the URI is to HTTPS, and the connector claimed to be a proxy,
// then it *should* have tunneled, and so we don't want to send
// absolute-form in that case.
@@ -763,10 +792,7 @@ fn authority_form(uri: &mut Uri) {
// `https://hyper.rs` would parse with `/` path, don't
// annoy people about that...
if path != "/" {
warn!(
"HTTP/1.1 CONNECT request stripping path: {:?}",
path
);
warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
}
}
}
@@ -775,7 +801,7 @@ fn authority_form(uri: &mut Uri) {
let mut parts = ::http::uri::Parts::default();
parts.authority = Some(auth.clone());
Uri::from_parts(parts).expect("authority is valid")
},
}
None => {
unreachable!("authority_form with relative uri");
}
@@ -785,9 +811,7 @@ fn authority_form(uri: &mut Uri) {
fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> crate::Result<String> {
let uri_clone = uri.clone();
match (uri_clone.scheme(), uri_clone.authority()) {
(Some(scheme), Some(auth)) => {
Ok(format!("{}://{}", scheme, auth))
}
(Some(scheme), Some(auth)) => Ok(format!("{}://{}", scheme, auth)),
(None, Some(auth)) if is_http_connect => {
let scheme = match auth.port_u16() {
Some(443) => {
@@ -797,10 +821,10 @@ fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> crate::Result<String>
_ => {
set_scheme(uri, Scheme::HTTP);
"http"
},
}
};
Ok(format!("{}://{}", scheme, auth))
},
}
_ => {
debug!("Client requires absolute-form URIs, received: {:?}", uri);
Err(crate::Error::new_user_absolute_uri_required())
@@ -809,7 +833,10 @@ fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> crate::Result<String>
}
fn set_scheme(uri: &mut Uri, scheme: Scheme) {
debug_assert!(uri.scheme().is_none(), "set_scheme expects no existing scheme");
debug_assert!(
uri.scheme().is_none(),
"set_scheme expects no existing scheme"
);
let old = mem::replace(uri, Uri::default());
let mut parts: ::http::uri::Parts = old.into();
parts.scheme = Some(scheme);
@@ -946,11 +973,7 @@ impl Builder {
///
/// Default is false.
pub fn http2_only(&mut self, val: bool) -> &mut Self {
self.client_config.ver = if val {
Ver::Http2
} else {
Ver::Auto
};
self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
self
}
@@ -963,7 +986,8 @@ impl Builder {
///
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
self.conn_builder.http2_initial_stream_window_size(sz.into());
self.conn_builder
.http2_initial_stream_window_size(sz.into());
self
}
@@ -972,8 +996,12 @@ impl Builder {
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
pub fn http2_initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
self.conn_builder.http2_initial_connection_window_size(sz.into());
pub fn http2_initial_connection_window_size(
&mut self,
sz: impl Into<Option<u32>>,
) -> &mut Self {
self.conn_builder
.http2_initial_connection_window_size(sz.into());
self
}

View File

@@ -10,8 +10,8 @@ use futures_channel::oneshot;
#[cfg(feature = "runtime")]
use tokio::time::{Duration, Instant, Interval};
use crate::common::{Exec, Future, Pin, Poll, Unpin, task};
use super::Ver;
use crate::common::{task, Exec, Future, Pin, Poll, Unpin};
// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
@@ -96,7 +96,7 @@ pub(super) struct Config {
impl<T> Pool<T> {
pub fn new(config: Config, __exec: &Exec) -> Pool<T> {
let inner = if config.enabled {
Some(Arc::new(Mutex::new(PoolInner {
Some(Arc::new(Mutex::new(PoolInner {
connecting: HashSet::new(),
idle: HashMap::new(),
#[cfg(feature = "runtime")]
@@ -106,14 +106,12 @@ impl<T> Pool<T> {
#[cfg(feature = "runtime")]
exec: __exec.clone(),
timeout: config.keep_alive_timeout,
})))
})))
} else {
None
};
Pool {
inner,
}
Pool { inner }
}
fn is_enabled(&self) -> bool {
@@ -174,12 +172,7 @@ impl<T: Poolable> Pool<T> {
#[cfg(test)]
fn locked(&self) -> ::std::sync::MutexGuard<'_, PoolInner<T>> {
self
.inner
.as_ref()
.expect("enabled")
.lock()
.expect("lock")
self.inner.as_ref().expect("enabled").lock().expect("lock")
}
/* Used in client/tests.rs...
@@ -216,13 +209,13 @@ impl<T: Poolable> Pool<T> {
// Shared reservations don't need a reference to the pool,
// since the pool always keeps a copy.
(to_return, WeakOpt::none())
},
}
Reservation::Unique(value) => {
// Unique reservations must take a reference to the pool
// since they hope to reinsert once the reservation is
// completed
(value, WeakOpt::downgrade(enabled))
},
}
}
} else {
// If pool is not enabled, skip all the things...
@@ -236,7 +229,7 @@ impl<T: Poolable> Pool<T> {
key: connecting.key.clone(),
is_reused: false,
pool: pool_ref,
value: Some(value)
value: Some(value),
}
}
@@ -299,10 +292,8 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
value: to_reinsert,
});
to_checkout
},
Reservation::Unique(unique) => {
unique
}
Reservation::Unique(unique) => unique,
};
return Some(Idle {
@@ -332,7 +323,7 @@ impl<T: Poolable> PoolInner<T> {
Reservation::Shared(to_keep, to_send) => {
value = Some(to_keep);
to_send
},
}
Reservation::Unique(uniq) => uniq,
};
match tx.send(reserved) {
@@ -342,7 +333,7 @@ impl<T: Poolable> PoolInner<T> {
} else {
continue;
}
},
}
Err(e) => {
value = Some(e);
}
@@ -361,10 +352,7 @@ impl<T: Poolable> PoolInner<T> {
Some(value) => {
// borrow-check scope...
{
let idle_list = self
.idle
.entry(key.clone())
.or_insert(Vec::new());
let idle_list = self.idle.entry(key.clone()).or_insert(Vec::new());
if self.max_idle_per_host <= idle_list.len() {
trace!("max idle per host for {:?}, dropping connection", key);
return;
@@ -390,10 +378,7 @@ impl<T: Poolable> PoolInner<T> {
/// but the lock is going away, so clean up.
fn connected(&mut self, key: &Key) {
let existed = self.connecting.remove(key);
debug_assert!(
existed,
"Connecting dropped, key not in pool.connecting"
);
debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
// cancel any waiters. if there are any, it's because
// this Connecting task didn't complete successfully.
// those waiters would never receive a connection.
@@ -412,7 +397,7 @@ impl<T: Poolable> PoolInner<T> {
self.idle_interval_ref = Some(tx);
(dur, rx)
} else {
return
return;
}
};
@@ -434,9 +419,7 @@ impl<T> PoolInner<T> {
fn clean_waiters(&mut self, key: &Key) {
let mut remove_waiters = false;
if let Some(waiters) = self.waiters.get_mut(key) {
waiters.retain(|tx| {
!tx.is_canceled()
});
waiters.retain(|tx| !tx.is_canceled());
remove_waiters = waiters.is_empty();
}
if remove_waiters {
@@ -547,9 +530,7 @@ impl<T: Poolable> Drop for Pooled<T> {
impl<T: Poolable> fmt::Debug for Pooled<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Pooled")
.field("key", &self.key)
.finish()
f.debug_struct("Pooled").field("key", &self.key).finish()
}
}
@@ -567,7 +548,10 @@ pub(super) struct Checkout<T> {
}
impl<T: Poolable> Checkout<T> {
fn poll_waiter(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Pooled<T>>>> {
fn poll_waiter(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<crate::Result<Pooled<T>>>> {
static CANCELED: &str = "pool checkout failed";
if let Some(mut rx) = self.waiter.take() {
match Pin::new(&mut rx).poll(cx) {
@@ -577,12 +561,14 @@ impl<T: Poolable> Checkout<T> {
} else {
Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED))))
}
},
}
Poll::Pending => {
self.waiter = Some(rx);
Poll::Pending
},
Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED)))),
}
Poll::Ready(Err(_canceled)) => {
Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED))))
}
}
} else {
Poll::Ready(None)
@@ -593,20 +579,19 @@ impl<T: Poolable> Checkout<T> {
let entry = {
let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
let expiration = Expiration::new(inner.timeout);
let maybe_entry = inner.idle.get_mut(&self.key)
.and_then(|list| {
trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
// A block to end the mutable borrow on list,
// so the map below can check is_empty()
{
let popper = IdlePopper {
key: &self.key,
list,
};
popper.pop(&expiration)
}
.map(|e| (e, list.is_empty()))
});
let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
// A block to end the mutable borrow on list,
// so the map below can check is_empty()
{
let popper = IdlePopper {
key: &self.key,
list,
};
popper.pop(&expiration)
}
.map(|e| (e, list.is_empty()))
});
let (entry, empty) = if let Some((e, empty)) = maybe_entry {
(Some(e), empty)
@@ -764,9 +749,7 @@ impl<T> WeakOpt<T> {
}
fn upgrade(&self) -> Option<Arc<T>> {
self.0
.as_ref()
.and_then(Weak::upgrade)
self.0.as_ref().and_then(Weak::upgrade)
}
}
@@ -776,8 +759,8 @@ mod tests {
use std::task::Poll;
use std::time::Duration;
use crate::common::{Exec, Future, Pin, task};
use super::{Connecting, Key, Poolable, Pool, Reservation, WeakOpt};
use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
use crate::common::{task, Exec, Future, Pin};
/// Test unique reservations.
#[derive(Debug, PartialEq, Eq)]
@@ -809,7 +792,8 @@ mod tests {
}
fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
let pool = Pool::new(super::Config {
let pool = Pool::new(
super::Config {
enabled: true,
keep_alive_timeout: Some(Duration::from_millis(100)),
max_idle_per_host: max_idle,
@@ -838,7 +822,8 @@ mod tests {
struct PollOnce<'a, F>(&'a mut F);
impl<F, T, U> Future for PollOnce<'_, F>
where F: Future<Output = Result<T, U>> + Unpin
where
F: Future<Output = Result<T, U>> + Unpin,
{
type Output = Option<()>;
@@ -846,7 +831,7 @@ mod tests {
match Pin::new(&mut self.0).poll(cx) {
Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
Poll::Ready(Err(_)) => Poll::Ready(Some(())),
Poll::Pending => Poll::Ready(None)
Poll::Pending => Poll::Ready(None),
}
}
}
@@ -875,7 +860,10 @@ mod tests {
pool.pooled(c(key.clone()), Uniq(5));
pool.pooled(c(key.clone()), Uniq(99));
assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3));
assert_eq!(
pool.locked().idle.get(&key).map(|entries| entries.len()),
Some(3)
);
tokio::time::delay_for(pool.locked().timeout.unwrap()).await;
let mut checkout = pool.checkout(key.clone());
@@ -895,7 +883,10 @@ mod tests {
pool.pooled(c(key.clone()), Uniq(99));
// pooled and dropped 3, max_idle should only allow 2
assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(2));
assert_eq!(
pool.locked().idle.get(&key).map(|entries| entries.len()),
Some(2)
);
}
#[cfg(feature = "runtime")]
@@ -904,7 +895,8 @@ mod tests {
let _ = pretty_env_logger::try_init();
tokio::time::pause();
let pool = Pool::new(super::Config {
let pool = Pool::new(
super::Config {
enabled: true,
keep_alive_timeout: Some(Duration::from_millis(10)),
max_idle_per_host: ::std::usize::MAX,
@@ -918,7 +910,10 @@ mod tests {
pool.pooled(c(key.clone()), Uniq(5));
pool.pooled(c(key.clone()), Uniq(99));
assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3));
assert_eq!(
pool.locked().idle.get(&key).map(|entries| entries.len()),
Some(3)
);
// Let the timer tick passed the expiration...
tokio::time::advance(Duration::from_millis(30)).await;
@@ -937,17 +932,15 @@ mod tests {
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(c(key.clone()), Uniq(41));
let checkout = join(
pool.checkout(key),
async {
// the checkout future will park first,
// and then this lazy future will be polled, which will insert
// the pooled back into the pool
//
// this test makes sure that doing so will unpark the checkout
drop(pooled);
},
).map(|(entry, _)| entry);
let checkout = join(pool.checkout(key), async {
// the checkout future will park first,
// and then this lazy future will be polled, which will insert
// the pooled back into the pool
//
// this test makes sure that doing so will unpark the checkout
drop(pooled);
})
.map(|(entry, _)| entry);
assert_eq!(*checkout.await.unwrap(), Uniq(41));
}
@@ -1001,10 +994,13 @@ mod tests {
fn pooled_drop_if_closed_doesnt_reinsert() {
let pool = pool_no_timer();
let key = Arc::new("localhost:12345".to_string());
pool.pooled(c(key.clone()), CanClose {
val: 57,
closed: true,
});
pool.pooled(
c(key.clone()),
CanClose {
val: 57,
closed: true,
},
);
assert!(!pool.locked().idle.contains_key(&key));
}

View File

@@ -2,12 +2,16 @@
//!
//! This module provides `Connect` which hook-ins into the Tower ecosystem.
use std::marker::PhantomData;
use std::future::Future;
use std::error::Error as StdError;
use std::future::Future;
use std::marker::PhantomData;
use crate::{common::{Poll, task, Pin}, body::Payload, service::{MakeConnection, Service}};
use super::conn::{SendRequest, Builder};
use super::conn::{Builder, SendRequest};
use crate::{
body::Payload,
common::{task, Pin, Poll},
service::{MakeConnection, Service},
};
/// Creates a connection via `SendRequest`.
///
@@ -18,7 +22,7 @@ use super::conn::{SendRequest, Builder};
pub struct Connect<C, B, T> {
inner: C,
builder: Builder,
_pd: PhantomData<fn(T, B)>
_pd: PhantomData<fn(T, B)>,
}
impl<C, B, T> Connect<C, B, T> {
@@ -28,7 +32,7 @@ impl<C, B, T> Connect<C, B, T> {
Self {
inner,
builder,
_pd: PhantomData
_pd: PhantomData,
}
}
}
@@ -44,10 +48,13 @@ where
{
type Response = SendRequest<B>;
type Error = crate::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into()))
self.inner
.poll_ready(cx)
.map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into()))
}
fn call(&mut self, req: T) -> Self::Future {
@@ -56,18 +63,16 @@ where
let fut = async move {
match io.await {
Ok(io) => {
match builder.handshake(io).await {
Ok((sr, conn)) => {
builder.exec.execute(async move {
if let Err(e) = conn.await {
debug!("connection error: {:?}", e);
}
});
Ok(sr)
},
Err(e) => Err(e)
Ok(io) => match builder.handshake(io).await {
Ok((sr, conn)) => {
builder.exec.execute(async move {
if let Err(e) = conn.await {
debug!("connection error: {:?}", e);
}
});
Ok(sr)
}
Err(e) => Err(e),
},
Err(e) => {
let err = crate::Error::new(crate::error::Kind::Connect).with(e.into());