feat(client): remove client::connect module (#2949)
Some checks failed
CI / CI is green (push) Has been cancelled
CI / Check Style (push) Has been cancelled
CI / Test beta on macOS-latest (push) Has been cancelled
CI / Test stable on macOS-latest (push) Has been cancelled
CI / Test beta on ubuntu-latest (push) Has been cancelled
CI / Test stable on ubuntu-latest (push) Has been cancelled
CI / Test beta on windows-latest (push) Has been cancelled
CI / Test stable on windows-latest (push) Has been cancelled
CI / Test nightly on macOS-latest (push) Has been cancelled
CI / Test nightly on ubuntu-latest (push) Has been cancelled
CI / Test nightly on windows-latest (push) Has been cancelled
CI / Check MSRV (1.56) (push) Has been cancelled
CI / Test with Miri (push) Has been cancelled
CI / features (push) Has been cancelled
CI / Test C API (FFI) (push) Has been cancelled
CI / Verify hyper.h is up to date (push) Has been cancelled
CI / Build docs (push) Has been cancelled
Benchmark / Benchmark (push) Has been cancelled

The connect pieces will be available in `hyper-util`.

BREAKING CHANGE: Use `connect` from `hyper-util`.
This commit is contained in:
Sean McArthur
2022-08-17 10:50:40 -07:00
committed by GitHub
parent bb3af17ce1
commit 5e20688398
6 changed files with 0 additions and 960 deletions

View File

@@ -1,239 +0,0 @@
//! DNS Resolution used by the `HttpConnector`.
//!
//! This module contains:
//!
//! - A [`GaiResolver`](GaiResolver) that is the default resolver for the
//! `HttpConnector`.
//! - The `Name` type used as an argument to custom resolvers.
//!
//! # Resolvers are `Service`s
//!
//! A resolver is just a
//! `Service<Name, Response = impl Iterator<Item = SocketAddr>>`.
//!
//! A simple resolver that ignores the name and always returns a specific
//! address:
//!
//! ```rust,ignore
//! use std::{convert::Infallible, iter, net::SocketAddr};
//!
//! let resolver = tower::service_fn(|_name| async {
//! Ok::<_, Infallible>(iter::once(SocketAddr::from(([127, 0, 0, 1], 8080))))
//! });
//! ```
use std::error::Error;
use std::net::{SocketAddr};
use std::str::FromStr;
use std::{fmt, vec};
/// A domain name to resolve into IP addresses.
#[derive(Clone, Hash, Eq, PartialEq)]
pub struct Name {
host: Box<str>,
}
/// A resolver using blocking `getaddrinfo` calls in a threadpool.
#[derive(Clone)]
pub struct GaiResolver {
_priv: (),
}
/// An iterator of IP addresses returned from `getaddrinfo`.
pub struct GaiAddrs {
inner: SocketAddrs,
}
impl Name {
pub(super) fn new(host: Box<str>) -> Name {
Name { host }
}
/// View the hostname as a string slice.
pub fn as_str(&self) -> &str {
&self.host
}
}
impl fmt::Debug for Name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.host, f)
}
}
impl fmt::Display for Name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.host, f)
}
}
impl FromStr for Name {
type Err = InvalidNameError;
fn from_str(host: &str) -> Result<Self, Self::Err> {
// Possibly add validation later
Ok(Name::new(host.into()))
}
}
/// Error indicating a given string was not a valid domain name.
#[derive(Debug)]
pub struct InvalidNameError(());
impl fmt::Display for InvalidNameError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Not a valid domain name")
}
}
impl Error for InvalidNameError {}
impl GaiResolver {
/// Construct a new `GaiResolver`.
pub fn new() -> Self {
GaiResolver { _priv: () }
}
}
impl fmt::Debug for GaiResolver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("GaiResolver")
}
}
impl Iterator for GaiAddrs {
type Item = SocketAddr;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}
impl fmt::Debug for GaiAddrs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("GaiAddrs")
}
}
pub(super) struct SocketAddrs {
iter: vec::IntoIter<SocketAddr>,
}
impl Iterator for SocketAddrs {
type Item = SocketAddr;
#[inline]
fn next(&mut self) -> Option<SocketAddr> {
self.iter.next()
}
}
/*
/// A resolver using `getaddrinfo` calls via the `tokio_executor::threadpool::blocking` API.
///
/// Unlike the `GaiResolver` this will not spawn dedicated threads, but only works when running on the
/// multi-threaded Tokio runtime.
#[cfg(feature = "runtime")]
#[derive(Clone, Debug)]
pub struct TokioThreadpoolGaiResolver(());
/// The future returned by `TokioThreadpoolGaiResolver`.
#[cfg(feature = "runtime")]
#[derive(Debug)]
pub struct TokioThreadpoolGaiFuture {
name: Name,
}
#[cfg(feature = "runtime")]
impl TokioThreadpoolGaiResolver {
/// Creates a new DNS resolver that will use tokio threadpool's blocking
/// feature.
///
/// **Requires** its futures to be run on the threadpool runtime.
pub fn new() -> Self {
TokioThreadpoolGaiResolver(())
}
}
#[cfg(feature = "runtime")]
impl Service<Name> for TokioThreadpoolGaiResolver {
type Response = GaiAddrs;
type Error = io::Error;
type Future = TokioThreadpoolGaiFuture;
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, name: Name) -> Self::Future {
TokioThreadpoolGaiFuture { name }
}
}
#[cfg(feature = "runtime")]
impl Future for TokioThreadpoolGaiFuture {
type Output = Result<GaiAddrs, io::Error>;
fn poll(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match ready!(tokio_executor::threadpool::blocking(|| (
self.name.as_str(),
0
)
.to_socket_addrs()))
{
Ok(Ok(iter)) => Poll::Ready(Ok(GaiAddrs {
inner: IpAddrs { iter },
})),
Ok(Err(e)) => Poll::Ready(Err(e)),
// a BlockingError, meaning not on a tokio_executor::threadpool :(
Err(e) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
}
}
}
*/
mod sealed {
use super::{Name, SocketAddr};
use crate::common::{task, Future, Poll};
use tower_service::Service;
// "Trait alias" for `Service<Name, Response = Addrs>`
pub(crate) trait Resolve {
type Addrs: Iterator<Item = SocketAddr>;
type Error: Into<Box<dyn std::error::Error + Send + Sync>>;
type Future: Future<Output = Result<Self::Addrs, Self::Error>>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
fn resolve(&mut self, name: Name) -> Self::Future;
}
impl<S> Resolve for S
where
S: Service<Name>,
S::Response: Iterator<Item = SocketAddr>,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Addrs = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Service::poll_ready(self, cx)
}
fn resolve(&mut self, name: Name) -> Self::Future {
Service::call(self, name)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_name_from_str() {
const DOMAIN: &str = "test.example.com";
let name = Name::from_str(DOMAIN).expect("Should be a valid domain");
assert_eq!(name.as_str(), DOMAIN);
assert_eq!(name.to_string(), DOMAIN);
}
}

View File

@@ -1,252 +0,0 @@
use std::error::Error as StdError;
use std::fmt;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
//#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver;
/// A connector for the `http` scheme.
///
/// Performs DNS resolution in a thread pool, and then connects over TCP.
///
/// # Note
///
/// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes
/// transport information such as the remote socket address used.
#[derive(Clone)]
pub struct HttpConnector {
config: Arc<Config>,
}
/// Extra information about the transport when an HttpConnector is used.
///
/// # Note
///
/// If a different connector is used besides [`HttpConnector`](HttpConnector),
/// this value will not exist in the extensions. Consult that specific
/// connector to see what "extra" information it might provide to responses.
#[derive(Clone, Debug)]
pub struct HttpInfo {
remote_addr: SocketAddr,
local_addr: SocketAddr,
}
#[derive(Clone)]
struct Config {
connect_timeout: Option<Duration>,
enforce_http: bool,
happy_eyeballs_timeout: Option<Duration>,
keep_alive_timeout: Option<Duration>,
local_address_ipv4: Option<Ipv4Addr>,
local_address_ipv6: Option<Ipv6Addr>,
nodelay: bool,
reuse_address: bool,
send_buffer_size: Option<usize>,
recv_buffer_size: Option<usize>,
}
// ===== impl HttpConnector =====
impl HttpConnector {
/// Construct a new HttpConnector.
pub fn new() -> HttpConnector {
HttpConnector {
config: Arc::new(Config {
connect_timeout: None,
enforce_http: true,
happy_eyeballs_timeout: Some(Duration::from_millis(300)),
keep_alive_timeout: None,
local_address_ipv4: None,
local_address_ipv6: None,
nodelay: false,
reuse_address: false,
send_buffer_size: None,
recv_buffer_size: None,
}),
}
}
}
/*
#[cfg(feature = "runtime")]
impl HttpConnector<TokioThreadpoolGaiResolver> {
/// Construct a new HttpConnector using the `TokioThreadpoolGaiResolver`.
///
/// This resolver **requires** the threadpool runtime to be used.
pub fn new_with_tokio_threadpool_resolver() -> Self {
HttpConnector::new_with_resolver(TokioThreadpoolGaiResolver::new())
}
}
*/
impl HttpConnector {
/// Option to enforce all `Uri`s have the `http` scheme.
///
/// Enabled by default.
#[inline]
pub fn enforce_http(&mut self, is_enforced: bool) {
self.config_mut().enforce_http = is_enforced;
}
/// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration.
///
/// If `None`, the option will not be set.
///
/// Default is `None`.
#[inline]
pub fn set_keepalive(&mut self, dur: Option<Duration>) {
self.config_mut().keep_alive_timeout = dur;
}
/// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`.
///
/// Default is `false`.
#[inline]
pub fn set_nodelay(&mut self, nodelay: bool) {
self.config_mut().nodelay = nodelay;
}
/// Sets the value of the SO_SNDBUF option on the socket.
#[inline]
pub fn set_send_buffer_size(&mut self, size: Option<usize>) {
self.config_mut().send_buffer_size = size;
}
/// Sets the value of the SO_RCVBUF option on the socket.
#[inline]
pub fn set_recv_buffer_size(&mut self, size: Option<usize>) {
self.config_mut().recv_buffer_size = size;
}
/// Set that all sockets are bound to the configured address before connection.
///
/// If `None`, the sockets will not be bound.
///
/// Default is `None`.
#[inline]
pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
let (v4, v6) = match addr {
Some(IpAddr::V4(a)) => (Some(a), None),
Some(IpAddr::V6(a)) => (None, Some(a)),
_ => (None, None),
};
let cfg = self.config_mut();
cfg.local_address_ipv4 = v4;
cfg.local_address_ipv6 = v6;
}
/// Set that all sockets are bound to the configured IPv4 or IPv6 address (depending on host's
/// preferences) before connection.
#[inline]
pub fn set_local_addresses(&mut self, addr_ipv4: Ipv4Addr, addr_ipv6: Ipv6Addr) {
let cfg = self.config_mut();
cfg.local_address_ipv4 = Some(addr_ipv4);
cfg.local_address_ipv6 = Some(addr_ipv6);
}
/// Set the connect timeout.
///
/// If a domain resolves to multiple IP addresses, the timeout will be
/// evenly divided across them.
///
/// Default is `None`.
#[inline]
pub fn set_connect_timeout(&mut self, dur: Option<Duration>) {
self.config_mut().connect_timeout = dur;
}
/// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm.
///
/// If hostname resolves to both IPv4 and IPv6 addresses and connection
/// cannot be established using preferred address family before timeout
/// elapses, then connector will in parallel attempt connection using other
/// address family.
///
/// If `None`, parallel connection attempts are disabled.
///
/// Default is 300 milliseconds.
///
/// [RFC 6555]: https://tools.ietf.org/html/rfc6555
#[inline]
pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) {
self.config_mut().happy_eyeballs_timeout = dur;
}
/// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`.
///
/// Default is `false`.
#[inline]
pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self {
self.config_mut().reuse_address = reuse_address;
self
}
// private
fn config_mut(&mut self) -> &mut Config {
// If the are HttpConnector clones, this will clone the inner
// config. So mutating the config won't ever affect previous
// clones.
Arc::make_mut(&mut self.config)
}
}
// R: Debug required for now to allow adding it to debug output later...
impl fmt::Debug for HttpConnector {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HttpConnector").finish()
}
}
impl HttpInfo {
/// Get the remote address of the transport used.
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
/// Get the local address of the transport used.
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
}
// Not publicly exported (so missing_docs doesn't trigger).
pub(crate) struct ConnectError {
msg: Box<str>,
cause: Option<Box<dyn StdError + Send + Sync>>,
}
impl fmt::Debug for ConnectError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(ref cause) = self.cause {
f.debug_tuple("ConnectError")
.field(&self.msg)
.field(cause)
.finish()
} else {
self.msg.fmt(f)
}
}
}
impl fmt::Display for ConnectError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.msg)?;
if let Some(ref cause) = self.cause {
write!(f, ": {}", cause)?;
}
Ok(())
}
}
impl StdError for ConnectError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
self.cause.as_ref().map(|e| &**e as _)
}
}

View File

@@ -1,391 +0,0 @@
//! Connectors used by the `Client`.
//!
//! This module contains:
//!
//! - A default [`HttpConnector`][] that does DNS resolution and establishes
//! connections over TCP.
//! - Types to build custom connectors.
//!
//! # Connectors
//!
//! A "connector" is a [`Service`][] that takes a [`Uri`][] destination, and
//! its `Response` is some type implementing [`AsyncRead`][], [`AsyncWrite`][],
//! and [`Connection`][].
//!
//! ## Custom Connectors
//!
//! A simple connector that ignores the `Uri` destination and always returns
//! a TCP connection to the same address could be written like this:
//!
//! ```rust,ignore
//! let connector = tower::service_fn(|_dst| async {
//! tokio::net::TcpStream::connect("127.0.0.1:1337")
//! })
//! ```
//!
//! Or, fully written out:
//!
//! ```
//! # #[cfg(feature = "runtime")]
//! # mod rt {
//! use std::{future::Future, net::SocketAddr, pin::Pin, task::{self, Poll}};
//! use hyper::{service::Service, Uri};
//! use tokio::net::TcpStream;
//!
//! #[derive(Clone)]
//! struct LocalConnector;
//!
//! impl Service<Uri> for LocalConnector {
//! type Response = TcpStream;
//! type Error = std::io::Error;
//! // We can't "name" an `async` generated future.
//! type Future = Pin<Box<
//! dyn Future<Output = Result<Self::Response, Self::Error>> + Send
//! >>;
//!
//! fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
//! // This connector is always ready, but others might not be.
//! Poll::Ready(Ok(()))
//! }
//!
//! fn call(&mut self, _: Uri) -> Self::Future {
//! Box::pin(TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], 1337))))
//! }
//! }
//! # }
//! ```
//!
//! [`Uri`]: ::http::Uri
//! [`AsyncRead`]: tokio::io::AsyncRead
//! [`AsyncWrite`]: tokio::io::AsyncWrite
//! [`Connection`]: Connection
//! [`Service`]: crate::service::Service
use std::fmt;
use ::http::Extensions;
pub use self::http::{HttpConnector, HttpInfo};
pub mod dns;
mod http;
cfg_feature! {
#![any(feature = "http1", feature = "http2")]
pub use self::sealed::Connect;
}
/// Describes a type returned by a connector.
pub trait Connection {
/// Return metadata describing the connection.
fn connected(&self) -> Connected;
}
/// Extra information about the connected transport.
///
/// This can be used to inform recipients about things like if ALPN
/// was used, or if connected to an HTTP proxy.
#[derive(Debug)]
pub struct Connected {
pub(super) alpn: Alpn,
pub(super) is_proxied: bool,
pub(super) extra: Option<Extra>,
}
pub(super) struct Extra(Box<dyn ExtraInner>);
#[derive(Clone, Copy, Debug, PartialEq)]
pub(super) enum Alpn {
H2,
None,
}
impl Connected {
/// Create new `Connected` type with empty metadata.
pub fn new() -> Connected {
Connected {
alpn: Alpn::None,
is_proxied: false,
extra: None,
}
}
/// Set whether the connected transport is to an HTTP proxy.
///
/// This setting will affect if HTTP/1 requests written on the transport
/// will have the request-target in absolute-form or origin-form:
///
/// - When `proxy(false)`:
///
/// ```http
/// GET /guide HTTP/1.1
/// ```
///
/// - When `proxy(true)`:
///
/// ```http
/// GET http://hyper.rs/guide HTTP/1.1
/// ```
///
/// Default is `false`.
pub fn proxy(mut self, is_proxied: bool) -> Connected {
self.is_proxied = is_proxied;
self
}
/// Determines if the connected transport is to an HTTP proxy.
pub fn is_proxied(&self) -> bool {
self.is_proxied
}
/// Set extra connection information to be set in the extensions of every `Response`.
pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected {
if let Some(prev) = self.extra {
self.extra = Some(Extra(Box::new(ExtraChain(prev.0, extra))));
} else {
self.extra = Some(Extra(Box::new(ExtraEnvelope(extra))));
}
self
}
/// Copies the extra connection information into an `Extensions` map.
pub fn get_extras(&self, extensions: &mut Extensions) {
if let Some(extra) = &self.extra {
extra.set(extensions);
}
}
/// Set that the connected transport negotiated HTTP/2 as its next protocol.
pub fn negotiated_h2(mut self) -> Connected {
self.alpn = Alpn::H2;
self
}
/// Determines if the connected transport negotiated HTTP/2 as its next protocol.
pub fn is_negotiated_h2(&self) -> bool {
self.alpn == Alpn::H2
}
/*
// Don't public expose that `Connected` is `Clone`, unsure if we want to
// keep that contract...
#[cfg(feature = "http2")]
pub(super) fn clone(&self) -> Connected {
Connected {
alpn: self.alpn.clone(),
is_proxied: self.is_proxied,
extra: self.extra.clone(),
}
}
*/
}
// ===== impl Extra =====
impl Extra {
pub(super) fn set(&self, res: &mut Extensions) {
self.0.set(res);
}
}
impl Clone for Extra {
fn clone(&self) -> Extra {
Extra(self.0.clone_box())
}
}
impl fmt::Debug for Extra {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Extra").finish()
}
}
trait ExtraInner: Send + Sync {
fn clone_box(&self) -> Box<dyn ExtraInner>;
fn set(&self, res: &mut Extensions);
}
// This indirection allows the `Connected` to have a type-erased "extra" value,
// while that type still knows its inner extra type. This allows the correct
// TypeId to be used when inserting into `res.extensions_mut()`.
#[derive(Clone)]
struct ExtraEnvelope<T>(T);
impl<T> ExtraInner for ExtraEnvelope<T>
where
T: Clone + Send + Sync + 'static,
{
fn clone_box(&self) -> Box<dyn ExtraInner> {
Box::new(self.clone())
}
fn set(&self, res: &mut Extensions) {
res.insert(self.0.clone());
}
}
struct ExtraChain<T>(Box<dyn ExtraInner>, T);
impl<T: Clone> Clone for ExtraChain<T> {
fn clone(&self) -> Self {
ExtraChain(self.0.clone_box(), self.1.clone())
}
}
impl<T> ExtraInner for ExtraChain<T>
where
T: Clone + Send + Sync + 'static,
{
fn clone_box(&self) -> Box<dyn ExtraInner> {
Box::new(self.clone())
}
fn set(&self, res: &mut Extensions) {
self.0.set(res);
res.insert(self.1.clone());
}
}
#[cfg(any(feature = "http1", feature = "http2"))]
pub(super) mod sealed {
use std::error::Error as StdError;
use ::http::Uri;
use tokio::io::{AsyncRead, AsyncWrite};
use super::Connection;
use crate::common::{Future, Unpin};
/// Connect to a destination, returning an IO transport.
///
/// A connector receives a [`Uri`](::http::Uri) and returns a `Future` of the
/// ready connection.
///
/// # Trait Alias
///
/// This is really just an *alias* for the `tower::Service` trait, with
/// additional bounds set for convenience *inside* hyper. You don't actually
/// implement this trait, but `tower::Service<Uri>` instead.
// The `Sized` bound is to prevent creating `dyn Connect`, since they cannot
// fit the `Connect` bounds because of the blanket impl for `Service`.
pub trait Connect: Sealed + Sized {
#[doc(hidden)]
type _Svc: ConnectSvc;
#[doc(hidden)]
fn connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future;
}
pub trait ConnectSvc {
type Connection: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static;
type Error: Into<Box<dyn StdError + Send + Sync>>;
type Future: Future<Output = Result<Self::Connection, Self::Error>> + Unpin + Send + 'static;
fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future;
}
impl<S, T> Connect for S
where
S: tower_service::Service<Uri, Response = T> + Send + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{
type _Svc = S;
fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri> {
crate::service::oneshot(self, dst)
}
}
impl<S, T> ConnectSvc for S
where
S: tower_service::Service<Uri, Response = T> + Send + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{
type Connection = T;
type Error = S::Error;
type Future = crate::service::Oneshot<S, Uri>;
fn connect(self, _: Internal, dst: Uri) -> Self::Future {
crate::service::oneshot(self, dst)
}
}
impl<S, T> Sealed for S
where
S: tower_service::Service<Uri, Response = T> + Send,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{
}
pub trait Sealed {}
#[allow(missing_debug_implementations)]
pub struct Internal;
}
#[cfg(test)]
mod tests {
use super::Connected;
#[derive(Clone, Debug, PartialEq)]
struct Ex1(usize);
#[derive(Clone, Debug, PartialEq)]
struct Ex2(&'static str);
#[derive(Clone, Debug, PartialEq)]
struct Ex3(&'static str);
#[test]
fn test_connected_extra() {
let c1 = Connected::new().extra(Ex1(41));
let mut ex = ::http::Extensions::new();
assert_eq!(ex.get::<Ex1>(), None);
c1.extra.as_ref().expect("c1 extra").set(&mut ex);
assert_eq!(ex.get::<Ex1>(), Some(&Ex1(41)));
}
#[test]
fn test_connected_extra_chain() {
// If a user composes connectors and at each stage, there's "extra"
// info to attach, it shouldn't override the previous extras.
let c1 = Connected::new()
.extra(Ex1(45))
.extra(Ex2("zoom"))
.extra(Ex3("pew pew"));
let mut ex1 = ::http::Extensions::new();
assert_eq!(ex1.get::<Ex1>(), None);
assert_eq!(ex1.get::<Ex2>(), None);
assert_eq!(ex1.get::<Ex3>(), None);
c1.extra.as_ref().expect("c1 extra").set(&mut ex1);
assert_eq!(ex1.get::<Ex1>(), Some(&Ex1(45)));
assert_eq!(ex1.get::<Ex2>(), Some(&Ex2("zoom")));
assert_eq!(ex1.get::<Ex3>(), Some(&Ex3("pew pew")));
// Just like extensions, inserting the same type overrides previous type.
let c2 = Connected::new()
.extra(Ex1(33))
.extra(Ex2("hiccup"))
.extra(Ex1(99));
let mut ex2 = ::http::Extensions::new();
c2.extra.as_ref().expect("c2 extra").set(&mut ex2);
assert_eq!(ex2.get::<Ex1>(), Some(&Ex1(99)));
assert_eq!(ex2.get::<Ex2>(), Some(&Ex2("hiccup")));
}
}

View File

@@ -7,7 +7,6 @@
//! For a small example program simply fetching a URL, take a look at the
//! [full client example](https://github.com/hyperium/hyper/blob/master/examples/client.rs).
pub mod connect;
#[cfg(all(test, feature = "runtime"))]
mod tests;

View File

@@ -24,13 +24,9 @@
pub use tower_service::Service;
mod http;
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "client"))]
mod oneshot;
mod util;
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))]
pub(super) use self::http::HttpService;
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "client"))]
pub(super) use self::oneshot::{oneshot, Oneshot};
pub use self::util::service_fn;

View File

@@ -1,73 +0,0 @@
// TODO: Eventually to be replaced with tower_util::Oneshot.
use pin_project_lite::pin_project;
use tower_service::Service;
use crate::common::{task, Future, Pin, Poll};
pub(crate) fn oneshot<S, Req>(svc: S, req: Req) -> Oneshot<S, Req>
where
S: Service<Req>,
{
Oneshot {
state: State::NotReady { svc, req },
}
}
pin_project! {
// A `Future` consuming a `Service` and request, waiting until the `Service`
// is ready, and then calling `Service::call` with the request, and
// waiting for that `Future`.
#[allow(missing_debug_implementations)]
pub struct Oneshot<S: Service<Req>, Req> {
#[pin]
state: State<S, Req>,
}
}
pin_project! {
#[project = StateProj]
#[project_replace = StateProjOwn]
enum State<S: Service<Req>, Req> {
NotReady {
svc: S,
req: Req,
},
Called {
#[pin]
fut: S::Future,
},
Tmp,
}
}
impl<S, Req> Future for Oneshot<S, Req>
where
S: Service<Req>,
{
type Output = Result<S::Response, S::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut me = self.project();
loop {
match me.state.as_mut().project() {
StateProj::NotReady { ref mut svc, .. } => {
ready!(svc.poll_ready(cx))?;
// fallthrough out of the match's borrow
}
StateProj::Called { fut } => {
return fut.poll(cx);
}
StateProj::Tmp => unreachable!(),
}
match me.state.as_mut().project_replace(State::Tmp) {
StateProjOwn::NotReady { mut svc, req } => {
me.state.set(State::Called { fut: svc.call(req) });
}
_ => unreachable!(),
}
}
}
}