feat(server): remove AddrStream struct (#2869)
remove addrstream type, it provides no benefit over tokio::net::tcpstream Closes #2850
This commit is contained in:
@@ -81,9 +81,6 @@ cfg_feature! {
|
|||||||
pub(super) use self::upgrades::UpgradeableConnection;
|
pub(super) use self::upgrades::UpgradeableConnection;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "tcp")]
|
|
||||||
pub use super::tcp::{AddrIncoming, AddrStream};
|
|
||||||
|
|
||||||
/// A lower-level configuration of the HTTP protocol.
|
/// A lower-level configuration of the HTTP protocol.
|
||||||
///
|
///
|
||||||
/// This structure is used to configure options for an HTTP server connection.
|
/// This structure is used to configure options for an HTTP server connection.
|
||||||
|
|||||||
@@ -92,7 +92,7 @@
|
|||||||
//! use hyper::{Body, Request, Response, Server};
|
//! use hyper::{Body, Request, Response, Server};
|
||||||
//! use hyper::service::{make_service_fn, service_fn};
|
//! use hyper::service::{make_service_fn, service_fn};
|
||||||
//! # #[cfg(feature = "runtime")]
|
//! # #[cfg(feature = "runtime")]
|
||||||
//! use hyper::server::conn::AddrStream;
|
//! use tokio::net::TcpStream;
|
||||||
//!
|
//!
|
||||||
//! #[derive(Clone)]
|
//! #[derive(Clone)]
|
||||||
//! struct AppContext {
|
//! struct AppContext {
|
||||||
@@ -115,14 +115,14 @@
|
|||||||
//! };
|
//! };
|
||||||
//!
|
//!
|
||||||
//! // A `MakeService` that produces a `Service` to handle each connection.
|
//! // A `MakeService` that produces a `Service` to handle each connection.
|
||||||
//! let make_service = make_service_fn(move |conn: &AddrStream| {
|
//! let make_service = make_service_fn(move |conn: &TcpStream| {
|
||||||
//! // We have to clone the context to share it with each invocation of
|
//! // We have to clone the context to share it with each invocation of
|
||||||
//! // `make_service`. If your data doesn't implement `Clone` consider using
|
//! // `make_service`. If your data doesn't implement `Clone` consider using
|
||||||
//! // an `std::sync::Arc`.
|
//! // an `std::sync::Arc`.
|
||||||
//! let context = context.clone();
|
//! let context = context.clone();
|
||||||
//!
|
//!
|
||||||
//! // You can grab the address of the incoming connection like so.
|
//! // You can grab the address of the incoming connection like so.
|
||||||
//! let addr = conn.remote_addr();
|
//! let addr = conn.peer_addr().unwrap();
|
||||||
//!
|
//!
|
||||||
//! // Create a `Service` for responding to the request.
|
//! // Create a `Service` for responding to the request.
|
||||||
//! let service = service_fn(move |req| {
|
//! let service = service_fn(move |req| {
|
||||||
|
|||||||
@@ -3,14 +3,12 @@ use std::io;
|
|||||||
use std::net::{SocketAddr, TcpListener as StdTcpListener};
|
use std::net::{SocketAddr, TcpListener as StdTcpListener};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio::time::Sleep;
|
use tokio::time::Sleep;
|
||||||
use tracing::{debug, error, trace};
|
use tracing::{debug, error, trace};
|
||||||
|
|
||||||
use crate::common::{task, Future, Pin, Poll};
|
use crate::common::{task, Future, Pin, Poll};
|
||||||
|
|
||||||
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
|
|
||||||
pub use self::addr_stream::AddrStream;
|
|
||||||
use super::accept::Accept;
|
use super::accept::Accept;
|
||||||
|
|
||||||
/// A stream of connections from binding to an address.
|
/// A stream of connections from binding to an address.
|
||||||
@@ -98,7 +96,7 @@ impl AddrIncoming {
|
|||||||
self.sleep_on_errors = val;
|
self.sleep_on_errors = val;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_next_(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<AddrStream>> {
|
fn poll_next_(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<TcpStream>> {
|
||||||
// Check if a previous timeout is active that was set by IO errors.
|
// Check if a previous timeout is active that was set by IO errors.
|
||||||
if let Some(ref mut to) = self.timeout {
|
if let Some(ref mut to) = self.timeout {
|
||||||
ready!(Pin::new(to).poll(cx));
|
ready!(Pin::new(to).poll(cx));
|
||||||
@@ -107,7 +105,7 @@ impl AddrIncoming {
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
match ready!(self.listener.poll_accept(cx)) {
|
match ready!(self.listener.poll_accept(cx)) {
|
||||||
Ok((socket, remote_addr)) => {
|
Ok((socket, _)) => {
|
||||||
if let Some(dur) = self.tcp_keepalive_timeout {
|
if let Some(dur) = self.tcp_keepalive_timeout {
|
||||||
let socket = socket2::SockRef::from(&socket);
|
let socket = socket2::SockRef::from(&socket);
|
||||||
let conf = socket2::TcpKeepalive::new().with_time(dur);
|
let conf = socket2::TcpKeepalive::new().with_time(dur);
|
||||||
@@ -118,8 +116,7 @@ impl AddrIncoming {
|
|||||||
if let Err(e) = socket.set_nodelay(self.tcp_nodelay) {
|
if let Err(e) = socket.set_nodelay(self.tcp_nodelay) {
|
||||||
trace!("error trying to set TCP nodelay: {}", e);
|
trace!("error trying to set TCP nodelay: {}", e);
|
||||||
}
|
}
|
||||||
let local_addr = socket.local_addr()?;
|
return Poll::Ready(Ok(socket));
|
||||||
return Poll::Ready(Ok(AddrStream::new(socket, remote_addr, local_addr)));
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Connection errors can be ignored directly, continue by
|
// Connection errors can be ignored directly, continue by
|
||||||
@@ -155,7 +152,7 @@ impl AddrIncoming {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Accept for AddrIncoming {
|
impl Accept for AddrIncoming {
|
||||||
type Conn = AddrStream;
|
type Conn = TcpStream;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
fn poll_accept(
|
fn poll_accept(
|
||||||
@@ -193,126 +190,3 @@ impl fmt::Debug for AddrIncoming {
|
|||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mod addr_stream {
|
|
||||||
use std::io;
|
|
||||||
use std::net::SocketAddr;
|
|
||||||
#[cfg(unix)]
|
|
||||||
use std::os::unix::io::{AsRawFd, RawFd};
|
|
||||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
|
||||||
use tokio::net::TcpStream;
|
|
||||||
|
|
||||||
use crate::common::{task, Pin, Poll};
|
|
||||||
|
|
||||||
pin_project_lite::pin_project! {
|
|
||||||
/// A transport returned yieled by `AddrIncoming`.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct AddrStream {
|
|
||||||
#[pin]
|
|
||||||
inner: TcpStream,
|
|
||||||
pub(super) remote_addr: SocketAddr,
|
|
||||||
pub(super) local_addr: SocketAddr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AddrStream {
|
|
||||||
pub(super) fn new(
|
|
||||||
tcp: TcpStream,
|
|
||||||
remote_addr: SocketAddr,
|
|
||||||
local_addr: SocketAddr,
|
|
||||||
) -> AddrStream {
|
|
||||||
AddrStream {
|
|
||||||
inner: tcp,
|
|
||||||
remote_addr,
|
|
||||||
local_addr,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the remote (peer) address of this connection.
|
|
||||||
#[inline]
|
|
||||||
pub fn remote_addr(&self) -> SocketAddr {
|
|
||||||
self.remote_addr
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the local address of this connection.
|
|
||||||
#[inline]
|
|
||||||
pub fn local_addr(&self) -> SocketAddr {
|
|
||||||
self.local_addr
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Consumes the AddrStream and returns the underlying IO object
|
|
||||||
#[inline]
|
|
||||||
pub fn into_inner(self) -> TcpStream {
|
|
||||||
self.inner
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Attempt to receive data on the socket, without removing that data
|
|
||||||
/// from the queue, registering the current task for wakeup if data is
|
|
||||||
/// not yet available.
|
|
||||||
pub fn poll_peek(
|
|
||||||
&mut self,
|
|
||||||
cx: &mut task::Context<'_>,
|
|
||||||
buf: &mut tokio::io::ReadBuf<'_>,
|
|
||||||
) -> Poll<io::Result<usize>> {
|
|
||||||
self.inner.poll_peek(cx, buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncRead for AddrStream {
|
|
||||||
#[inline]
|
|
||||||
fn poll_read(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut task::Context<'_>,
|
|
||||||
buf: &mut ReadBuf<'_>,
|
|
||||||
) -> Poll<io::Result<()>> {
|
|
||||||
self.project().inner.poll_read(cx, buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncWrite for AddrStream {
|
|
||||||
#[inline]
|
|
||||||
fn poll_write(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut task::Context<'_>,
|
|
||||||
buf: &[u8],
|
|
||||||
) -> Poll<io::Result<usize>> {
|
|
||||||
self.project().inner.poll_write(cx, buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn poll_write_vectored(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut task::Context<'_>,
|
|
||||||
bufs: &[io::IoSlice<'_>],
|
|
||||||
) -> Poll<io::Result<usize>> {
|
|
||||||
self.project().inner.poll_write_vectored(cx, bufs)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
|
|
||||||
// TCP flush is a noop
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
|
|
||||||
self.project().inner.poll_shutdown(cx)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn is_write_vectored(&self) -> bool {
|
|
||||||
// Note that since `self.inner` is a `TcpStream`, this could
|
|
||||||
// *probably* be hard-coded to return `true`...but it seems more
|
|
||||||
// correct to ask it anyway (maybe we're on some platform without
|
|
||||||
// scatter-gather IO?)
|
|
||||||
self.inner.is_write_vectored()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(unix)]
|
|
||||||
impl AsRawFd for AddrStream {
|
|
||||||
fn as_raw_fd(&self) -> RawFd {
|
|
||||||
self.inner.as_raw_fd()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -108,13 +108,13 @@ where
|
|||||||
/// # async fn run() {
|
/// # async fn run() {
|
||||||
/// use std::convert::Infallible;
|
/// use std::convert::Infallible;
|
||||||
/// use hyper::{Body, Request, Response, Server};
|
/// use hyper::{Body, Request, Response, Server};
|
||||||
/// use hyper::server::conn::AddrStream;
|
/// use tokio::net::TcpStream;
|
||||||
/// use hyper::service::{make_service_fn, service_fn};
|
/// use hyper::service::{make_service_fn, service_fn};
|
||||||
///
|
///
|
||||||
/// let addr = ([127, 0, 0, 1], 3000).into();
|
/// let addr = ([127, 0, 0, 1], 3000).into();
|
||||||
///
|
///
|
||||||
/// let make_svc = make_service_fn(|socket: &AddrStream| {
|
/// let make_svc = make_service_fn(|socket: &TcpStream| {
|
||||||
/// let remote_addr = socket.remote_addr();
|
/// let remote_addr = socket.peer_addr().unwrap();
|
||||||
/// async move {
|
/// async move {
|
||||||
/// Ok::<_, Infallible>(service_fn(move |_: Request<Body>| async move {
|
/// Ok::<_, Infallible>(service_fn(move |_: Request<Body>| async move {
|
||||||
/// Ok::<_, Infallible>(
|
/// Ok::<_, Infallible>(
|
||||||
|
|||||||
Reference in New Issue
Block a user