Add connection_verbose setting to log IO events (#774)
This commit is contained in:
165
src/connect.rs
165
src/connect.rs
@@ -37,6 +37,7 @@ type HttpConnector = hyper::client::HttpConnector;
|
||||
pub(crate) struct Connector {
|
||||
inner: Inner,
|
||||
proxies: Arc<Vec<Proxy>>,
|
||||
verbose: verbose::Wrapper,
|
||||
timeout: Option<Duration>,
|
||||
#[cfg(feature = "__tls")]
|
||||
nodelay: bool,
|
||||
@@ -73,6 +74,7 @@ impl Connector {
|
||||
http.set_nodelay(nodelay);
|
||||
Ok(Connector {
|
||||
inner: Inner::Http(http),
|
||||
verbose: verbose::OFF,
|
||||
proxies,
|
||||
timeout: None,
|
||||
})
|
||||
@@ -98,6 +100,7 @@ impl Connector {
|
||||
Ok(Connector {
|
||||
inner: Inner::DefaultTls(http, tls),
|
||||
proxies,
|
||||
verbose: verbose::OFF,
|
||||
timeout: None,
|
||||
nodelay,
|
||||
user_agent,
|
||||
@@ -135,6 +138,7 @@ impl Connector {
|
||||
tls_proxy,
|
||||
},
|
||||
proxies,
|
||||
verbose: verbose::OFF,
|
||||
timeout: None,
|
||||
nodelay,
|
||||
user_agent,
|
||||
@@ -145,6 +149,10 @@ impl Connector {
|
||||
self.timeout = timeout;
|
||||
}
|
||||
|
||||
pub(crate) fn set_verbose(&mut self, enabled: bool) {
|
||||
self.verbose.0 = enabled;
|
||||
}
|
||||
|
||||
#[cfg(feature = "socks")]
|
||||
async fn connect_socks(
|
||||
&self,
|
||||
@@ -178,7 +186,7 @@ impl Connector {
|
||||
.await
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
return Ok(Conn {
|
||||
inner: Box::new(NativeTlsConn { inner: io }),
|
||||
inner: self.verbose.wrap(NativeTlsConn { inner: io }),
|
||||
is_proxy: false,
|
||||
});
|
||||
}
|
||||
@@ -203,7 +211,7 @@ impl Connector {
|
||||
.await
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
return Ok(Conn {
|
||||
inner: Box::new(RustlsTlsConn { inner: io }),
|
||||
inner: self.verbose.wrap(RustlsTlsConn { inner: io }),
|
||||
is_proxy: false,
|
||||
});
|
||||
}
|
||||
@@ -212,7 +220,10 @@ impl Connector {
|
||||
Inner::Http(_) => ()
|
||||
}
|
||||
|
||||
socks::connect(proxy, dst, dns).await
|
||||
socks::connect(proxy, dst, dns).await.map(|tcp| Conn {
|
||||
inner: self.verbose.wrap(tcp),
|
||||
is_proxy: false,
|
||||
})
|
||||
}
|
||||
|
||||
async fn connect_with_maybe_proxy(
|
||||
@@ -225,7 +236,7 @@ impl Connector {
|
||||
Inner::Http(mut http) => {
|
||||
let io = http.call(dst).await?;
|
||||
Ok(Conn {
|
||||
inner: Box::new(io),
|
||||
inner: self.verbose.wrap(io),
|
||||
is_proxy,
|
||||
})
|
||||
}
|
||||
@@ -246,7 +257,7 @@ impl Connector {
|
||||
//}
|
||||
|
||||
Ok(Conn {
|
||||
inner: Box::new(io),
|
||||
inner: self.verbose.wrap(io),
|
||||
is_proxy,
|
||||
})
|
||||
}
|
||||
@@ -270,7 +281,7 @@ impl Connector {
|
||||
}
|
||||
|
||||
Ok(Conn {
|
||||
inner: Box::new(io),
|
||||
inner: self.verbose.wrap(io),
|
||||
is_proxy,
|
||||
})
|
||||
}
|
||||
@@ -322,7 +333,7 @@ impl Connector {
|
||||
.await
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
return Ok(Conn {
|
||||
inner: Box::new(NativeTlsConn { inner: io }),
|
||||
inner: self.verbose.wrap(NativeTlsConn { inner: io }),
|
||||
is_proxy: false,
|
||||
});
|
||||
}
|
||||
@@ -359,7 +370,7 @@ impl Connector {
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
|
||||
return Ok(Conn {
|
||||
inner: Box::new(RustlsTlsConn { inner: io }),
|
||||
inner: self.verbose.wrap(RustlsTlsConn { inner: io }),
|
||||
is_proxy: false,
|
||||
});
|
||||
}
|
||||
@@ -438,8 +449,11 @@ impl Service<Uri> for Connector
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait AsyncConn: AsyncRead + AsyncWrite + Connection {}
|
||||
impl<T: AsyncRead + AsyncWrite + Connection> AsyncConn for T {}
|
||||
pub(crate) trait AsyncConn: AsyncRead + AsyncWrite + Connection + Send + Sync + Unpin + 'static {}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + Connection + Send + Sync + Unpin + 'static> AsyncConn for T {}
|
||||
|
||||
type BoxConn = Box<dyn AsyncConn>;
|
||||
|
||||
pin_project! {
|
||||
/// Note: the `is_proxy` member means *is plain text HTTP proxy*.
|
||||
@@ -448,7 +462,7 @@ pin_project! {
|
||||
/// * absolute-form (`GET http://foo.bar/and/a/path HTTP/1.1`), otherwise.
|
||||
pub(crate) struct Conn {
|
||||
#[pin]
|
||||
inner: Box<dyn AsyncConn + Send + Sync + Unpin + 'static>,
|
||||
inner: BoxConn,
|
||||
is_proxy: bool,
|
||||
}
|
||||
}
|
||||
@@ -795,11 +809,13 @@ mod rustls_tls_conn {
|
||||
|
||||
#[cfg(feature = "socks")]
|
||||
mod socks {
|
||||
use http::Uri;
|
||||
use tokio_socks::tcp::Socks5Stream;
|
||||
use std::io;
|
||||
use std::net::ToSocketAddrs;
|
||||
|
||||
use http::Uri;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_socks::tcp::Socks5Stream;
|
||||
|
||||
use super::{BoxError, Scheme};
|
||||
use crate::proxy::ProxyScheme;
|
||||
|
||||
@@ -812,7 +828,7 @@ mod socks {
|
||||
proxy: ProxyScheme,
|
||||
dst: Uri,
|
||||
dns: DnsResolve,
|
||||
) -> Result<super::Conn, BoxError> {
|
||||
) -> Result<TcpStream, BoxError> {
|
||||
let https = dst.scheme() == Some(&Scheme::HTTPS);
|
||||
let original_host = dst
|
||||
.host()
|
||||
@@ -852,10 +868,123 @@ mod socks {
|
||||
.map_err(|e| format!("socks connect error: {}", e))?
|
||||
};
|
||||
|
||||
Ok(super::Conn {
|
||||
inner: Box::new( stream.into_inner() ),
|
||||
is_proxy: false,
|
||||
})
|
||||
Ok(stream.into_inner())
|
||||
}
|
||||
}
|
||||
|
||||
mod verbose {
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use hyper::client::connect::{Connected, Connection};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
pub(super) const OFF: Wrapper = Wrapper(false);
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub(super) struct Wrapper(pub(super) bool);
|
||||
|
||||
impl Wrapper {
|
||||
pub(super) fn wrap<T: super::AsyncConn>(&self, conn: T) -> super::BoxConn {
|
||||
if self.0 && log::log_enabled!(log::Level::Trace) {
|
||||
Box::new(Verbose {
|
||||
// truncate is fine
|
||||
id: crate::util::fast_random() as u32,
|
||||
inner: conn,
|
||||
})
|
||||
} else {
|
||||
Box::new(conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Verbose<T> {
|
||||
id: u32,
|
||||
inner: T,
|
||||
}
|
||||
|
||||
impl<T: Connection + AsyncRead + AsyncWrite + Unpin> Connection for Verbose<T> {
|
||||
fn connected(&self) -> Connected {
|
||||
self.inner.connected()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + Unpin> AsyncRead for Verbose<T> {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
buf: &mut [u8]
|
||||
) -> Poll<std::io::Result<usize>> {
|
||||
match Pin::new(&mut self.inner).poll_read(cx, buf) {
|
||||
Poll::Ready(Ok(n)) => {
|
||||
log::trace!("{:08x} read: {:?}", self.id, Escape(&buf[..n]));
|
||||
Poll::Ready(Ok(n))
|
||||
},
|
||||
Poll::Ready(Err(e)) => {
|
||||
Poll::Ready(Err(e))
|
||||
},
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + Unpin> AsyncWrite for Verbose<T> {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
buf: &[u8]
|
||||
) -> Poll<Result<usize, std::io::Error>> {
|
||||
match Pin::new(&mut self.inner).poll_write(cx, buf) {
|
||||
Poll::Ready(Ok(n)) => {
|
||||
log::trace!("{:08x} write: {:?}", self.id, Escape(&buf[..n]));
|
||||
Poll::Ready(Ok(n))
|
||||
},
|
||||
Poll::Ready(Err(e)) => {
|
||||
Poll::Ready(Err(e))
|
||||
},
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), std::io::Error>> {
|
||||
Pin::new(&mut self.inner).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context
|
||||
) -> Poll<Result<(), std::io::Error>> {
|
||||
Pin::new(&mut self.inner).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
struct Escape<'a>(&'a [u8]);
|
||||
|
||||
impl fmt::Debug for Escape<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "b\"")?;
|
||||
for &c in self.0 {
|
||||
// https://doc.rust-lang.org/reference.html#byte-escapes
|
||||
if c == b'\n' {
|
||||
write!(f, "\\n")?;
|
||||
} else if c == b'\r' {
|
||||
write!(f, "\\r")?;
|
||||
} else if c == b'\t' {
|
||||
write!(f, "\\t")?;
|
||||
} else if c == b'\\' || c == b'"' {
|
||||
write!(f, "\\{}", c as char)?;
|
||||
} else if c == b'\0' {
|
||||
write!(f, "\\0")?;
|
||||
// ASCII printable
|
||||
} else if c >= 0x20 && c < 0x7f {
|
||||
write!(f, "{}", c as char)?;
|
||||
} else {
|
||||
write!(f, "\\x{:02x}", c)?;
|
||||
}
|
||||
}
|
||||
write!(f, "\"")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user