feat(server): remove the high-level Server API (#2932)
This removes `hyper::Server`, and it's related parts: - `hyper::server::Builder` - `hyper::server::accept` - `hyper::service::make_service_fn` New utilities for managing servers will exist in `hyper-util`.
This commit is contained in:
@@ -1,217 +0,0 @@
|
|||||||
use std::mem;
|
|
||||||
|
|
||||||
use pin_project_lite::pin_project;
|
|
||||||
use tokio::sync::watch;
|
|
||||||
|
|
||||||
use super::{task, Future, Pin, Poll};
|
|
||||||
|
|
||||||
pub(crate) fn channel() -> (Signal, Watch) {
|
|
||||||
let (tx, rx) = watch::channel(());
|
|
||||||
(Signal { tx }, Watch { rx })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct Signal {
|
|
||||||
tx: watch::Sender<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct Draining(Pin<Box<dyn Future<Output = ()> + Send + Sync>>);
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub(crate) struct Watch {
|
|
||||||
rx: watch::Receiver<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pin_project! {
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
pub struct Watching<F, FN> {
|
|
||||||
#[pin]
|
|
||||||
future: F,
|
|
||||||
state: State<FN>,
|
|
||||||
watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
|
|
||||||
_rx: watch::Receiver<()>,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum State<F> {
|
|
||||||
Watch(F),
|
|
||||||
Draining,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Signal {
|
|
||||||
pub(crate) fn drain(self) -> Draining {
|
|
||||||
let _ = self.tx.send(());
|
|
||||||
Draining(Box::pin(async move { self.tx.closed().await }))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Future for Draining {
|
|
||||||
type Output = ();
|
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
|
||||||
Pin::new(&mut self.as_mut().0).poll(cx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Watch {
|
|
||||||
pub(crate) fn watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN>
|
|
||||||
where
|
|
||||||
F: Future,
|
|
||||||
FN: FnOnce(Pin<&mut F>),
|
|
||||||
{
|
|
||||||
let Self { mut rx } = self;
|
|
||||||
let _rx = rx.clone();
|
|
||||||
Watching {
|
|
||||||
future,
|
|
||||||
state: State::Watch(on_drain),
|
|
||||||
watch: Box::pin(async move {
|
|
||||||
let _ = rx.changed().await;
|
|
||||||
}),
|
|
||||||
// Keep the receiver alive until the future completes, so that
|
|
||||||
// dropping it can signal that draining has completed.
|
|
||||||
_rx,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F, FN> Future for Watching<F, FN>
|
|
||||||
where
|
|
||||||
F: Future,
|
|
||||||
FN: FnOnce(Pin<&mut F>),
|
|
||||||
{
|
|
||||||
type Output = F::Output;
|
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
|
||||||
let mut me = self.project();
|
|
||||||
loop {
|
|
||||||
match mem::replace(me.state, State::Draining) {
|
|
||||||
State::Watch(on_drain) => {
|
|
||||||
match Pin::new(&mut me.watch).poll(cx) {
|
|
||||||
Poll::Ready(()) => {
|
|
||||||
// Drain has been triggered!
|
|
||||||
on_drain(me.future.as_mut());
|
|
||||||
}
|
|
||||||
Poll::Pending => {
|
|
||||||
*me.state = State::Watch(on_drain);
|
|
||||||
return me.future.poll(cx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
State::Draining => return me.future.poll(cx),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
struct TestMe {
|
|
||||||
draining: bool,
|
|
||||||
finished: bool,
|
|
||||||
poll_cnt: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Future for TestMe {
|
|
||||||
type Output = ();
|
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
|
|
||||||
self.poll_cnt += 1;
|
|
||||||
if self.finished {
|
|
||||||
Poll::Ready(())
|
|
||||||
} else {
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn watch() {
|
|
||||||
let mut mock = tokio_test::task::spawn(());
|
|
||||||
mock.enter(|cx, _| {
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
let fut = TestMe {
|
|
||||||
draining: false,
|
|
||||||
finished: false,
|
|
||||||
poll_cnt: 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut watch = rx.watch(fut, |mut fut| {
|
|
||||||
fut.draining = true;
|
|
||||||
});
|
|
||||||
|
|
||||||
assert_eq!(watch.future.poll_cnt, 0);
|
|
||||||
|
|
||||||
// First poll should poll the inner future
|
|
||||||
assert!(Pin::new(&mut watch).poll(cx).is_pending());
|
|
||||||
assert_eq!(watch.future.poll_cnt, 1);
|
|
||||||
|
|
||||||
// Second poll should poll the inner future again
|
|
||||||
assert!(Pin::new(&mut watch).poll(cx).is_pending());
|
|
||||||
assert_eq!(watch.future.poll_cnt, 2);
|
|
||||||
|
|
||||||
let mut draining = tx.drain();
|
|
||||||
// Drain signaled, but needs another poll to be noticed.
|
|
||||||
assert!(!watch.future.draining);
|
|
||||||
assert_eq!(watch.future.poll_cnt, 2);
|
|
||||||
|
|
||||||
// Now, poll after drain has been signaled.
|
|
||||||
assert!(Pin::new(&mut watch).poll(cx).is_pending());
|
|
||||||
assert_eq!(watch.future.poll_cnt, 3);
|
|
||||||
assert!(watch.future.draining);
|
|
||||||
|
|
||||||
// Draining is not ready until watcher completes
|
|
||||||
assert!(Pin::new(&mut draining).poll(cx).is_pending());
|
|
||||||
|
|
||||||
// Finishing up the watch future
|
|
||||||
watch.future.finished = true;
|
|
||||||
assert!(Pin::new(&mut watch).poll(cx).is_ready());
|
|
||||||
assert_eq!(watch.future.poll_cnt, 4);
|
|
||||||
drop(watch);
|
|
||||||
|
|
||||||
assert!(Pin::new(&mut draining).poll(cx).is_ready());
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn watch_clones() {
|
|
||||||
let mut mock = tokio_test::task::spawn(());
|
|
||||||
mock.enter(|cx, _| {
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
|
|
||||||
let fut1 = TestMe {
|
|
||||||
draining: false,
|
|
||||||
finished: false,
|
|
||||||
poll_cnt: 0,
|
|
||||||
};
|
|
||||||
let fut2 = TestMe {
|
|
||||||
draining: false,
|
|
||||||
finished: false,
|
|
||||||
poll_cnt: 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
let watch1 = rx.clone().watch(fut1, |mut fut| {
|
|
||||||
fut.draining = true;
|
|
||||||
});
|
|
||||||
let watch2 = rx.watch(fut2, |mut fut| {
|
|
||||||
fut.draining = true;
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut draining = tx.drain();
|
|
||||||
|
|
||||||
// Still 2 outstanding watchers
|
|
||||||
assert!(Pin::new(&mut draining).poll(cx).is_pending());
|
|
||||||
|
|
||||||
// drop 1 for whatever reason
|
|
||||||
drop(watch1);
|
|
||||||
|
|
||||||
// Still not ready, 1 other watcher still pending
|
|
||||||
assert!(Pin::new(&mut draining).poll(cx).is_pending());
|
|
||||||
|
|
||||||
drop(watch2);
|
|
||||||
|
|
||||||
// Now all watchers are gone, draining is complete
|
|
||||||
assert!(Pin::new(&mut draining).poll(cx).is_ready());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -3,28 +3,17 @@ use std::future::Future;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
|
|
||||||
use crate::body::Body;
|
|
||||||
#[cfg(feature = "server")]
|
#[cfg(feature = "server")]
|
||||||
use crate::body::HttpBody;
|
use crate::body::HttpBody;
|
||||||
#[cfg(all(feature = "http2", feature = "server"))]
|
#[cfg(all(feature = "http2", feature = "server"))]
|
||||||
use crate::proto::h2::server::H2Stream;
|
use crate::proto::h2::server::H2Stream;
|
||||||
use crate::rt::Executor;
|
use crate::rt::Executor;
|
||||||
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
|
|
||||||
use crate::server::server::{new_svc::NewSvcTask, Watcher};
|
|
||||||
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
|
|
||||||
use crate::service::HttpService;
|
|
||||||
|
|
||||||
#[cfg(feature = "server")]
|
#[cfg(feature = "server")]
|
||||||
pub trait ConnStreamExec<F, B: HttpBody>: Clone {
|
pub trait ConnStreamExec<F, B: HttpBody>: Clone {
|
||||||
fn execute_h2stream(&mut self, fut: H2Stream<F, B>);
|
fn execute_h2stream(&mut self, fut: H2Stream<F, B>);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
|
|
||||||
pub trait NewSvcExec<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>>: Clone {
|
|
||||||
fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
|
pub(crate) type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
|
||||||
|
|
||||||
// Either the user provides an executor for background tasks, or we use
|
// Either the user provides an executor for background tasks, or we use
|
||||||
@@ -78,18 +67,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
|
|
||||||
impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec
|
|
||||||
where
|
|
||||||
NewSvcTask<I, N, S, E, W>: Future<Output = ()> + Send + 'static,
|
|
||||||
S: HttpService<Body>,
|
|
||||||
W: Watcher<I, S, E>,
|
|
||||||
{
|
|
||||||
fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) {
|
|
||||||
self.execute(fut)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==== impl Executor =====
|
// ==== impl Executor =====
|
||||||
|
|
||||||
#[cfg(feature = "server")]
|
#[cfg(feature = "server")]
|
||||||
@@ -104,19 +81,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
|
|
||||||
impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E
|
|
||||||
where
|
|
||||||
E: Executor<NewSvcTask<I, N, S, E, W>> + Clone,
|
|
||||||
NewSvcTask<I, N, S, E, W>: Future<Output = ()>,
|
|
||||||
S: HttpService<Body>,
|
|
||||||
W: Watcher<I, S, E>,
|
|
||||||
{
|
|
||||||
fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) {
|
|
||||||
self.execute(fut)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If http2 is not enable, we just have a stub here, so that the trait bounds
|
// If http2 is not enable, we just have a stub here, so that the trait bounds
|
||||||
// that *would* have been needed are still checked. Why?
|
// that *would* have been needed are still checked. Why?
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -10,8 +10,6 @@ macro_rules! ready {
|
|||||||
pub(crate) mod buf;
|
pub(crate) mod buf;
|
||||||
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
|
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
|
||||||
pub(crate) mod date;
|
pub(crate) mod date;
|
||||||
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
|
|
||||||
pub(crate) mod drain;
|
|
||||||
#[cfg(any(feature = "http1", feature = "http2", feature = "server"))]
|
#[cfg(any(feature = "http1", feature = "http2", feature = "server"))]
|
||||||
pub(crate) mod exec;
|
pub(crate) mod exec;
|
||||||
pub(crate) mod io;
|
pub(crate) mod io;
|
||||||
|
|||||||
26
src/error.rs
26
src/error.rs
@@ -40,10 +40,6 @@ pub(super) enum Kind {
|
|||||||
/// Error creating a TcpListener.
|
/// Error creating a TcpListener.
|
||||||
#[cfg(all(feature = "tcp", feature = "server"))]
|
#[cfg(all(feature = "tcp", feature = "server"))]
|
||||||
Listen,
|
Listen,
|
||||||
/// Error accepting on an Incoming stream.
|
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
|
||||||
#[cfg(feature = "server")]
|
|
||||||
Accept,
|
|
||||||
/// User took too long to send headers
|
/// User took too long to send headers
|
||||||
#[cfg(all(feature = "http1", feature = "server", feature = "runtime"))]
|
#[cfg(all(feature = "http1", feature = "server", feature = "runtime"))]
|
||||||
HeaderTimeout,
|
HeaderTimeout,
|
||||||
@@ -96,10 +92,6 @@ pub(super) enum User {
|
|||||||
Body,
|
Body,
|
||||||
/// The user aborted writing of the outgoing body.
|
/// The user aborted writing of the outgoing body.
|
||||||
BodyWriteAborted,
|
BodyWriteAborted,
|
||||||
/// Error calling user's MakeService.
|
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
|
||||||
#[cfg(feature = "server")]
|
|
||||||
MakeService,
|
|
||||||
/// Error from future of user's Service.
|
/// Error from future of user's Service.
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
#[cfg(any(feature = "http1", feature = "http2"))]
|
||||||
Service,
|
Service,
|
||||||
@@ -278,12 +270,6 @@ impl Error {
|
|||||||
Error::new(Kind::Listen).with(cause)
|
Error::new(Kind::Listen).with(cause)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
|
||||||
#[cfg(feature = "server")]
|
|
||||||
pub(super) fn new_accept<E: Into<Cause>>(cause: E) -> Error {
|
|
||||||
Error::new(Kind::Accept).with(cause)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
#[cfg(any(feature = "http1", feature = "http2"))]
|
||||||
#[cfg(feature = "client")]
|
#[cfg(feature = "client")]
|
||||||
pub(super) fn new_connect<E: Into<Cause>>(cause: E) -> Error {
|
pub(super) fn new_connect<E: Into<Cause>>(cause: E) -> Error {
|
||||||
@@ -356,12 +342,6 @@ impl Error {
|
|||||||
Error::new_user(User::ManualUpgrade)
|
Error::new_user(User::ManualUpgrade)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
|
||||||
#[cfg(feature = "server")]
|
|
||||||
pub(super) fn new_user_make_service<E: Into<Cause>>(cause: E) -> Error {
|
|
||||||
Error::new_user(User::MakeService).with(cause)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
#[cfg(any(feature = "http1", feature = "http2"))]
|
||||||
pub(super) fn new_user_service<E: Into<Cause>>(cause: E) -> Error {
|
pub(super) fn new_user_service<E: Into<Cause>>(cause: E) -> Error {
|
||||||
Error::new_user(User::Service).with(cause)
|
Error::new_user(User::Service).with(cause)
|
||||||
@@ -435,9 +415,6 @@ impl Error {
|
|||||||
Kind::Canceled => "operation was canceled",
|
Kind::Canceled => "operation was canceled",
|
||||||
#[cfg(all(feature = "server", feature = "tcp"))]
|
#[cfg(all(feature = "server", feature = "tcp"))]
|
||||||
Kind::Listen => "error creating server listener",
|
Kind::Listen => "error creating server listener",
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
|
||||||
#[cfg(feature = "server")]
|
|
||||||
Kind::Accept => "error accepting connection",
|
|
||||||
#[cfg(all(feature = "http1", feature = "server", feature = "runtime"))]
|
#[cfg(all(feature = "http1", feature = "server", feature = "runtime"))]
|
||||||
Kind::HeaderTimeout => "read header from client timeout",
|
Kind::HeaderTimeout => "read header from client timeout",
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
#[cfg(any(feature = "http1", feature = "http2"))]
|
||||||
@@ -455,9 +432,6 @@ impl Error {
|
|||||||
Kind::User(User::Body) => "error from user's HttpBody stream",
|
Kind::User(User::Body) => "error from user's HttpBody stream",
|
||||||
Kind::User(User::BodyWriteAborted) => "user body write aborted",
|
Kind::User(User::BodyWriteAborted) => "user body write aborted",
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
#[cfg(any(feature = "http1", feature = "http2"))]
|
||||||
#[cfg(feature = "server")]
|
|
||||||
Kind::User(User::MakeService) => "error from user's MakeService",
|
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
|
||||||
Kind::User(User::Service) => "error from user's Service",
|
Kind::User(User::Service) => "error from user's Service",
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
#[cfg(any(feature = "http1", feature = "http2"))]
|
||||||
#[cfg(feature = "server")]
|
#[cfg(feature = "server")]
|
||||||
|
|||||||
@@ -102,6 +102,4 @@ cfg_feature! {
|
|||||||
#![feature = "server"]
|
#![feature = "server"]
|
||||||
|
|
||||||
pub mod server;
|
pub mod server;
|
||||||
#[doc(no_inline)]
|
|
||||||
pub use crate::server::Server;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,71 +0,0 @@
|
|||||||
//! The `Accept` trait and supporting types.
|
|
||||||
//!
|
|
||||||
//! This module contains:
|
|
||||||
//!
|
|
||||||
//! - The [`Accept`](Accept) trait used to asynchronously accept incoming
|
|
||||||
//! connections.
|
|
||||||
//! - Utilities like `poll_fn` to ease creating a custom `Accept`.
|
|
||||||
|
|
||||||
use crate::common::{
|
|
||||||
task::{self, Poll},
|
|
||||||
Pin,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Asynchronously accept incoming connections.
|
|
||||||
pub trait Accept {
|
|
||||||
/// The connection type that can be accepted.
|
|
||||||
type Conn;
|
|
||||||
/// The error type that can occur when accepting a connection.
|
|
||||||
type Error;
|
|
||||||
|
|
||||||
/// Poll to accept the next connection.
|
|
||||||
fn poll_accept(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut task::Context<'_>,
|
|
||||||
) -> Poll<Option<Result<Self::Conn, Self::Error>>>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create an `Accept` with a polling function.
|
|
||||||
///
|
|
||||||
/// # Example
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use std::task::Poll;
|
|
||||||
/// use hyper::server::{accept, Server};
|
|
||||||
///
|
|
||||||
/// # let mock_conn = ();
|
|
||||||
/// // If we created some mocked connection...
|
|
||||||
/// let mut conn = Some(mock_conn);
|
|
||||||
///
|
|
||||||
/// // And accept just the mocked conn once...
|
|
||||||
/// let once = accept::poll_fn(move |cx| {
|
|
||||||
/// Poll::Ready(conn.take().map(Ok::<_, ()>))
|
|
||||||
/// });
|
|
||||||
///
|
|
||||||
/// let builder = Server::builder(once);
|
|
||||||
/// ```
|
|
||||||
pub fn poll_fn<F, IO, E>(func: F) -> impl Accept<Conn = IO, Error = E>
|
|
||||||
where
|
|
||||||
F: FnMut(&mut task::Context<'_>) -> Poll<Option<Result<IO, E>>>,
|
|
||||||
{
|
|
||||||
struct PollFn<F>(F);
|
|
||||||
|
|
||||||
// The closure `F` is never pinned
|
|
||||||
impl<F> Unpin for PollFn<F> {}
|
|
||||||
|
|
||||||
impl<F, IO, E> Accept for PollFn<F>
|
|
||||||
where
|
|
||||||
F: FnMut(&mut task::Context<'_>) -> Poll<Option<Result<IO, E>>>,
|
|
||||||
{
|
|
||||||
type Conn = IO;
|
|
||||||
type Error = E;
|
|
||||||
fn poll_accept(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut task::Context<'_>,
|
|
||||||
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
|
|
||||||
(self.get_mut().0)(cx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
PollFn(func)
|
|
||||||
}
|
|
||||||
@@ -5,9 +5,6 @@
|
|||||||
//! are not handled at this level. This module provides the building blocks to
|
//! are not handled at this level. This module provides the building blocks to
|
||||||
//! customize those things externally.
|
//! customize those things externally.
|
||||||
//!
|
//!
|
||||||
//! If you don't have need to manage connections yourself, consider using the
|
|
||||||
//! higher-level [Server](super) API.
|
|
||||||
//!
|
|
||||||
//! ## Example
|
//! ## Example
|
||||||
//! A simple example that uses the `Http` struct to talk HTTP over a Tokio TCP stream
|
//! A simple example that uses the `Http` struct to talk HTTP over a Tokio TCP stream
|
||||||
//! ```no_run
|
//! ```no_run
|
||||||
@@ -69,7 +66,6 @@ cfg_feature! {
|
|||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
|
|
||||||
pub use super::server::Connecting;
|
|
||||||
use crate::body::{Body, HttpBody};
|
use crate::body::{Body, HttpBody};
|
||||||
use crate::common::{task, Future, Pin, Poll, Unpin};
|
use crate::common::{task, Future, Pin, Poll, Unpin};
|
||||||
#[cfg(not(all(feature = "http1", feature = "http2")))]
|
#[cfg(not(all(feature = "http1", feature = "http2")))]
|
||||||
@@ -84,9 +80,6 @@ cfg_feature! {
|
|||||||
/// A lower-level configuration of the HTTP protocol.
|
/// A lower-level configuration of the HTTP protocol.
|
||||||
///
|
///
|
||||||
/// This structure is used to configure options for an HTTP server connection.
|
/// This structure is used to configure options for an HTTP server connection.
|
||||||
///
|
|
||||||
/// If you don't have need to manage connections yourself, consider using the
|
|
||||||
/// higher-level [Server](super) API.
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
#[cfg(any(feature = "http1", feature = "http2"))]
|
||||||
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
|
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
|
||||||
|
|||||||
@@ -1,37 +1,10 @@
|
|||||||
//! HTTP Server
|
//! HTTP Server
|
||||||
//!
|
//!
|
||||||
//! A `Server` is created to listen on a port, parse HTTP requests, and hand
|
//! A "server" is usually created by listening on a port for new connections,
|
||||||
//! them off to a `Service`.
|
//! parse HTTP requests, and hand them off to a `Service`.
|
||||||
//!
|
//!
|
||||||
//! There are two levels of APIs provide for constructing HTTP servers:
|
//! How exactly you choose to listen for connections is not something hyper
|
||||||
//!
|
//! concerns itself with. After you have a connection, you can handle HTTP over
|
||||||
//! - The higher-level [`Server`](Server) type.
|
//! it with the types in the [`conn`](conn) module.
|
||||||
//! - The lower-level [`conn`](conn) module.
|
|
||||||
//!
|
|
||||||
//! # Server
|
|
||||||
//!
|
|
||||||
//! The [`Server`](Server) is main way to start listening for HTTP requests.
|
|
||||||
//! It wraps a listener with a [`MakeService`](crate::service), and then should
|
|
||||||
//! be executed to start serving requests.
|
|
||||||
//!
|
|
||||||
//! [`Server`](Server) accepts connections in both HTTP1 and HTTP2 by default.
|
|
||||||
pub mod accept;
|
|
||||||
pub mod conn;
|
pub mod conn;
|
||||||
|
|
||||||
pub use self::server::Server;
|
|
||||||
|
|
||||||
cfg_feature! {
|
|
||||||
#![any(feature = "http1", feature = "http2")]
|
|
||||||
|
|
||||||
pub(crate) mod server;
|
|
||||||
pub use self::server::Builder;
|
|
||||||
|
|
||||||
mod shutdown;
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg_feature! {
|
|
||||||
#![not(any(feature = "http1", feature = "http2"))]
|
|
||||||
|
|
||||||
mod server_stub;
|
|
||||||
use server_stub as server;
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,622 +0,0 @@
|
|||||||
use std::error::Error as StdError;
|
|
||||||
use std::fmt;
|
|
||||||
#[cfg(feature = "http1")]
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use pin_project_lite::pin_project;
|
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
|
||||||
use tracing::trace;
|
|
||||||
|
|
||||||
use super::accept::Accept;
|
|
||||||
use crate::body::{Body, HttpBody};
|
|
||||||
use crate::common::exec::Exec;
|
|
||||||
use crate::common::exec::{ConnStreamExec, NewSvcExec};
|
|
||||||
use crate::common::{task, Future, Pin, Poll, Unpin};
|
|
||||||
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
|
|
||||||
// error that `hyper::server::Http` is private...
|
|
||||||
use super::conn::{Connection, Http as Http_, UpgradeableConnection};
|
|
||||||
use super::shutdown::{Graceful, GracefulWatcher};
|
|
||||||
use crate::service::{HttpService, MakeServiceRef};
|
|
||||||
|
|
||||||
use self::new_svc::NewSvcTask;
|
|
||||||
|
|
||||||
pin_project! {
|
|
||||||
/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
|
|
||||||
///
|
|
||||||
/// `Server` is a `Future` mapping a bound listener with a set of service
|
|
||||||
/// handlers. It is built using the [`Builder`](Builder), and the future
|
|
||||||
/// completes when the server has been shutdown. It should be run by an
|
|
||||||
/// `Executor`.
|
|
||||||
pub struct Server<I, S, E = Exec> {
|
|
||||||
#[pin]
|
|
||||||
incoming: I,
|
|
||||||
make_service: S,
|
|
||||||
protocol: Http_<E>,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A builder for a [`Server`](Server).
|
|
||||||
#[derive(Debug)]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
|
|
||||||
pub struct Builder<I, E = Exec> {
|
|
||||||
incoming: I,
|
|
||||||
protocol: Http_<E>,
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===== impl Server =====
|
|
||||||
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
|
|
||||||
impl<I> Server<I, ()> {
|
|
||||||
/// Starts a [`Builder`](Builder) with the provided incoming stream.
|
|
||||||
pub fn builder(incoming: I) -> Builder<I> {
|
|
||||||
Builder {
|
|
||||||
incoming,
|
|
||||||
protocol: Http_::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
|
|
||||||
impl<I, IO, IE, S, E, B> Server<I, S, E>
|
|
||||||
where
|
|
||||||
I: Accept<Conn = IO, Error = IE>,
|
|
||||||
IE: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
||||||
S: MakeServiceRef<IO, Body, ResBody = B>,
|
|
||||||
S::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
B: HttpBody + 'static,
|
|
||||||
B::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
|
|
||||||
{
|
|
||||||
/// Prepares a server to handle graceful shutdown when the provided future
|
|
||||||
/// completes.
|
|
||||||
pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F, E>
|
|
||||||
where
|
|
||||||
F: Future<Output = ()>,
|
|
||||||
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
|
|
||||||
{
|
|
||||||
Graceful::new(self, signal)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_next_(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut task::Context<'_>,
|
|
||||||
) -> Poll<Option<crate::Result<Connecting<IO, S::Future, E>>>> {
|
|
||||||
let me = self.project();
|
|
||||||
match ready!(me.make_service.poll_ready_ref(cx)) {
|
|
||||||
Ok(()) => (),
|
|
||||||
Err(e) => {
|
|
||||||
trace!("make_service closed");
|
|
||||||
return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e))));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(item) = ready!(me.incoming.poll_accept(cx)) {
|
|
||||||
let io = item.map_err(crate::Error::new_accept)?;
|
|
||||||
let new_fut = me.make_service.make_service_ref(&io);
|
|
||||||
Poll::Ready(Some(Ok(Connecting {
|
|
||||||
future: new_fut,
|
|
||||||
io: Some(io),
|
|
||||||
protocol: me.protocol.clone(),
|
|
||||||
})))
|
|
||||||
} else {
|
|
||||||
Poll::Ready(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn poll_watch<W>(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut task::Context<'_>,
|
|
||||||
watcher: &W,
|
|
||||||
) -> Poll<crate::Result<()>>
|
|
||||||
where
|
|
||||||
E: NewSvcExec<IO, S::Future, S::Service, E, W>,
|
|
||||||
W: Watcher<IO, S::Service, E>,
|
|
||||||
{
|
|
||||||
loop {
|
|
||||||
if let Some(connecting) = ready!(self.as_mut().poll_next_(cx)?) {
|
|
||||||
let fut = NewSvcTask::new(connecting, watcher.clone());
|
|
||||||
self.as_mut().project().protocol.exec.execute_new_svc(fut);
|
|
||||||
} else {
|
|
||||||
return Poll::Ready(Ok(()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
|
|
||||||
impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
|
|
||||||
where
|
|
||||||
I: Accept<Conn = IO, Error = IE>,
|
|
||||||
IE: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
||||||
S: MakeServiceRef<IO, Body, ResBody = B>,
|
|
||||||
S::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
B: HttpBody + 'static,
|
|
||||||
B::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
|
|
||||||
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
|
|
||||||
{
|
|
||||||
type Output = crate::Result<()>;
|
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
|
||||||
self.poll_watch(cx, &NoopWatcher)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
let mut st = f.debug_struct("Server");
|
|
||||||
st.field("listener", &self.incoming);
|
|
||||||
st.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===== impl Builder =====
|
|
||||||
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
|
|
||||||
impl<I, E> Builder<I, E> {
|
|
||||||
/// Start a new builder, wrapping an incoming stream and low-level options.
|
|
||||||
pub fn new(incoming: I, protocol: Http_<E>) -> Self {
|
|
||||||
Builder { incoming, protocol }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets whether to use keep-alive for HTTP/1 connections.
|
|
||||||
///
|
|
||||||
/// Default is `true`.
|
|
||||||
#[cfg(feature = "http1")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
|
|
||||||
pub fn http1_keepalive(mut self, val: bool) -> Self {
|
|
||||||
self.protocol.http1_keep_alive(val);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set whether HTTP/1 connections should support half-closures.
|
|
||||||
///
|
|
||||||
/// Clients can chose to shutdown their write-side while waiting
|
|
||||||
/// for the server to respond. Setting this to `true` will
|
|
||||||
/// prevent closing the connection immediately if `read`
|
|
||||||
/// detects an EOF in the middle of a request.
|
|
||||||
///
|
|
||||||
/// Default is `false`.
|
|
||||||
#[cfg(feature = "http1")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
|
|
||||||
pub fn http1_half_close(mut self, val: bool) -> Self {
|
|
||||||
self.protocol.http1_half_close(val);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set the maximum buffer size.
|
|
||||||
///
|
|
||||||
/// Default is ~ 400kb.
|
|
||||||
#[cfg(feature = "http1")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
|
|
||||||
pub fn http1_max_buf_size(mut self, val: usize) -> Self {
|
|
||||||
self.protocol.max_buf_size(val);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sets whether to bunch up HTTP/1 writes until the read buffer is empty.
|
|
||||||
//
|
|
||||||
// This isn't really desirable in most cases, only really being useful in
|
|
||||||
// silly pipeline benchmarks.
|
|
||||||
#[doc(hidden)]
|
|
||||||
#[cfg(feature = "http1")]
|
|
||||||
pub fn http1_pipeline_flush(mut self, val: bool) -> Self {
|
|
||||||
self.protocol.pipeline_flush(val);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set whether HTTP/1 connections should try to use vectored writes,
|
|
||||||
/// or always flatten into a single buffer.
|
|
||||||
///
|
|
||||||
/// Note that setting this to false may mean more copies of body data,
|
|
||||||
/// but may also improve performance when an IO transport doesn't
|
|
||||||
/// support vectored writes well, such as most TLS implementations.
|
|
||||||
///
|
|
||||||
/// Setting this to true will force hyper to use queued strategy
|
|
||||||
/// which may eliminate unnecessary cloning on some TLS backends
|
|
||||||
///
|
|
||||||
/// Default is `auto`. In this mode hyper will try to guess which
|
|
||||||
/// mode to use
|
|
||||||
#[cfg(feature = "http1")]
|
|
||||||
pub fn http1_writev(mut self, enabled: bool) -> Self {
|
|
||||||
self.protocol.http1_writev(enabled);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set whether HTTP/1 connections will write header names as title case at
|
|
||||||
/// the socket level.
|
|
||||||
///
|
|
||||||
/// Note that this setting does not affect HTTP/2.
|
|
||||||
///
|
|
||||||
/// Default is false.
|
|
||||||
#[cfg(feature = "http1")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
|
|
||||||
pub fn http1_title_case_headers(mut self, val: bool) -> Self {
|
|
||||||
self.protocol.http1_title_case_headers(val);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set whether to support preserving original header cases.
|
|
||||||
///
|
|
||||||
/// Currently, this will record the original cases received, and store them
|
|
||||||
/// in a private extension on the `Request`. It will also look for and use
|
|
||||||
/// such an extension in any provided `Response`.
|
|
||||||
///
|
|
||||||
/// Since the relevant extension is still private, there is no way to
|
|
||||||
/// interact with the original cases. The only effect this can have now is
|
|
||||||
/// to forward the cases in a proxy-like fashion.
|
|
||||||
///
|
|
||||||
/// Note that this setting does not affect HTTP/2.
|
|
||||||
///
|
|
||||||
/// Default is false.
|
|
||||||
#[cfg(feature = "http1")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
|
|
||||||
pub fn http1_preserve_header_case(mut self, val: bool) -> Self {
|
|
||||||
self.protocol.http1_preserve_header_case(val);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set a timeout for reading client request headers. If a client does not
|
|
||||||
/// transmit the entire header within this time, the connection is closed.
|
|
||||||
///
|
|
||||||
/// Default is None.
|
|
||||||
#[cfg(all(feature = "http1", feature = "runtime"))]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))]
|
|
||||||
pub fn http1_header_read_timeout(mut self, read_timeout: Duration) -> Self {
|
|
||||||
self.protocol.http1_header_read_timeout(read_timeout);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets whether HTTP/1 is required.
|
|
||||||
///
|
|
||||||
/// Default is `false`.
|
|
||||||
#[cfg(feature = "http1")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
|
|
||||||
pub fn http1_only(mut self, val: bool) -> Self {
|
|
||||||
self.protocol.http1_only(val);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets whether HTTP/2 is required.
|
|
||||||
///
|
|
||||||
/// Default is `false`.
|
|
||||||
#[cfg(feature = "http2")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
|
||||||
pub fn http2_only(mut self, val: bool) -> Self {
|
|
||||||
self.protocol.http2_only(val);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
|
|
||||||
/// stream-level flow control.
|
|
||||||
///
|
|
||||||
/// Passing `None` will do nothing.
|
|
||||||
///
|
|
||||||
/// If not set, hyper will use a default.
|
|
||||||
///
|
|
||||||
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
|
|
||||||
#[cfg(feature = "http2")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
|
||||||
pub fn http2_initial_stream_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
|
|
||||||
self.protocol.http2_initial_stream_window_size(sz.into());
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets the max connection-level flow control for HTTP2
|
|
||||||
///
|
|
||||||
/// Passing `None` will do nothing.
|
|
||||||
///
|
|
||||||
/// If not set, hyper will use a default.
|
|
||||||
#[cfg(feature = "http2")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
|
||||||
pub fn http2_initial_connection_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
|
|
||||||
self.protocol
|
|
||||||
.http2_initial_connection_window_size(sz.into());
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets whether to use an adaptive flow control.
|
|
||||||
///
|
|
||||||
/// Enabling this will override the limits set in
|
|
||||||
/// `http2_initial_stream_window_size` and
|
|
||||||
/// `http2_initial_connection_window_size`.
|
|
||||||
#[cfg(feature = "http2")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
|
||||||
pub fn http2_adaptive_window(mut self, enabled: bool) -> Self {
|
|
||||||
self.protocol.http2_adaptive_window(enabled);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets the maximum frame size to use for HTTP2.
|
|
||||||
///
|
|
||||||
/// Passing `None` will do nothing.
|
|
||||||
///
|
|
||||||
/// If not set, hyper will use a default.
|
|
||||||
#[cfg(feature = "http2")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
|
||||||
pub fn http2_max_frame_size(mut self, sz: impl Into<Option<u32>>) -> Self {
|
|
||||||
self.protocol.http2_max_frame_size(sz);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
|
|
||||||
/// connections.
|
|
||||||
///
|
|
||||||
/// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
|
|
||||||
///
|
|
||||||
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
|
|
||||||
#[cfg(feature = "http2")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
|
||||||
pub fn http2_max_concurrent_streams(mut self, max: impl Into<Option<u32>>) -> Self {
|
|
||||||
self.protocol.http2_max_concurrent_streams(max.into());
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets an interval for HTTP2 Ping frames should be sent to keep a
|
|
||||||
/// connection alive.
|
|
||||||
///
|
|
||||||
/// Pass `None` to disable HTTP2 keep-alive.
|
|
||||||
///
|
|
||||||
/// Default is currently disabled.
|
|
||||||
///
|
|
||||||
/// # Cargo Feature
|
|
||||||
///
|
|
||||||
/// Requires the `runtime` cargo feature to be enabled.
|
|
||||||
#[cfg(all(feature = "runtime", feature = "http2"))]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
|
||||||
pub fn http2_keep_alive_interval(mut self, interval: impl Into<Option<Duration>>) -> Self {
|
|
||||||
self.protocol.http2_keep_alive_interval(interval);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
|
|
||||||
///
|
|
||||||
/// If the ping is not acknowledged within the timeout, the connection will
|
|
||||||
/// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
|
|
||||||
///
|
|
||||||
/// Default is 20 seconds.
|
|
||||||
///
|
|
||||||
/// # Cargo Feature
|
|
||||||
///
|
|
||||||
/// Requires the `runtime` cargo feature to be enabled.
|
|
||||||
#[cfg(all(feature = "runtime", feature = "http2"))]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
|
||||||
pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self {
|
|
||||||
self.protocol.http2_keep_alive_timeout(timeout);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set the maximum write buffer size for each HTTP/2 stream.
|
|
||||||
///
|
|
||||||
/// Default is currently ~400KB, but may change.
|
|
||||||
///
|
|
||||||
/// # Panics
|
|
||||||
///
|
|
||||||
/// The value must be no larger than `u32::MAX`.
|
|
||||||
#[cfg(feature = "http2")]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
|
|
||||||
pub fn http2_max_send_buf_size(mut self, max: usize) -> Self {
|
|
||||||
self.protocol.http2_max_send_buf_size(max);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Enables the [extended CONNECT protocol].
|
|
||||||
///
|
|
||||||
/// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
|
|
||||||
#[cfg(feature = "http2")]
|
|
||||||
pub fn http2_enable_connect_protocol(mut self) -> Self {
|
|
||||||
self.protocol.http2_enable_connect_protocol();
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets the `Executor` to deal with connection tasks.
|
|
||||||
///
|
|
||||||
/// Default is `tokio::spawn`.
|
|
||||||
pub fn executor<E2>(self, executor: E2) -> Builder<I, E2> {
|
|
||||||
Builder {
|
|
||||||
incoming: self.incoming,
|
|
||||||
protocol: self.protocol.with_executor(executor),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Consume this `Builder`, creating a [`Server`](Server).
|
|
||||||
pub fn serve<S, B>(self, make_service: S) -> Server<I, S, E>
|
|
||||||
where
|
|
||||||
I: Accept,
|
|
||||||
I::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
||||||
S: MakeServiceRef<I::Conn, Body, ResBody = B>,
|
|
||||||
S::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
B: HttpBody + 'static,
|
|
||||||
B::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
|
|
||||||
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
|
|
||||||
{
|
|
||||||
Server {
|
|
||||||
incoming: self.incoming,
|
|
||||||
make_service,
|
|
||||||
protocol: self.protocol.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Used by `Server` to optionally watch a `Connection` future.
|
|
||||||
//
|
|
||||||
// The regular `hyper::Server` just uses a `NoopWatcher`, which does
|
|
||||||
// not need to watch anything, and so returns the `Connection` untouched.
|
|
||||||
//
|
|
||||||
// The `Server::with_graceful_shutdown` needs to keep track of all active
|
|
||||||
// connections, and signal that they start to shutdown when prompted, so
|
|
||||||
// it has a `GracefulWatcher` implementation to do that.
|
|
||||||
pub trait Watcher<I, S: HttpService<Body>, E>: Clone {
|
|
||||||
type Future: Future<Output = crate::Result<()>>;
|
|
||||||
|
|
||||||
fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
#[derive(Copy, Clone)]
|
|
||||||
pub struct NoopWatcher;
|
|
||||||
|
|
||||||
impl<I, S, E> Watcher<I, S, E> for NoopWatcher
|
|
||||||
where
|
|
||||||
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
||||||
S: HttpService<Body>,
|
|
||||||
E: ConnStreamExec<S::Future, S::ResBody>,
|
|
||||||
S::ResBody: 'static,
|
|
||||||
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
{
|
|
||||||
type Future = UpgradeableConnection<I, S, E>;
|
|
||||||
|
|
||||||
fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future {
|
|
||||||
conn
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// used by exec.rs
|
|
||||||
pub(crate) mod new_svc {
|
|
||||||
use std::error::Error as StdError;
|
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
|
||||||
use tracing::debug;
|
|
||||||
|
|
||||||
use super::{Connecting, Watcher};
|
|
||||||
use crate::body::{Body, HttpBody};
|
|
||||||
use crate::common::exec::ConnStreamExec;
|
|
||||||
use crate::common::{task, Future, Pin, Poll, Unpin};
|
|
||||||
use crate::service::HttpService;
|
|
||||||
use pin_project_lite::pin_project;
|
|
||||||
|
|
||||||
// This is a `Future<Item=(), Error=()>` spawned to an `Executor` inside
|
|
||||||
// the `Server`. By being a nameable type, we can be generic over the
|
|
||||||
// user's `Service::Future`, and thus an `Executor` can execute it.
|
|
||||||
//
|
|
||||||
// Doing this allows for the server to conditionally require `Send` futures,
|
|
||||||
// depending on the `Executor` configured.
|
|
||||||
//
|
|
||||||
// Users cannot import this type, nor the associated `NewSvcExec`. Instead,
|
|
||||||
// a blanket implementation for `Executor<impl Future>` is sufficient.
|
|
||||||
|
|
||||||
pin_project! {
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
pub struct NewSvcTask<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
|
|
||||||
#[pin]
|
|
||||||
state: State<I, N, S, E, W>,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pin_project! {
|
|
||||||
#[project = StateProj]
|
|
||||||
pub(super) enum State<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
|
|
||||||
Connecting {
|
|
||||||
#[pin]
|
|
||||||
connecting: Connecting<I, N, E>,
|
|
||||||
watcher: W,
|
|
||||||
},
|
|
||||||
Connected {
|
|
||||||
#[pin]
|
|
||||||
future: W::Future,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> {
|
|
||||||
pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self {
|
|
||||||
NewSvcTask {
|
|
||||||
state: State::Connecting {
|
|
||||||
connecting,
|
|
||||||
watcher,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, N, S, NE, B, E, W> Future for NewSvcTask<I, N, S, E, W>
|
|
||||||
where
|
|
||||||
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
||||||
N: Future<Output = Result<S, NE>>,
|
|
||||||
NE: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
S: HttpService<Body, ResBody = B>,
|
|
||||||
B: HttpBody + 'static,
|
|
||||||
B::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
E: ConnStreamExec<S::Future, B>,
|
|
||||||
W: Watcher<I, S, E>,
|
|
||||||
{
|
|
||||||
type Output = ();
|
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
|
||||||
// If it weren't for needing to name this type so the `Send` bounds
|
|
||||||
// could be projected to the `Serve` executor, this could just be
|
|
||||||
// an `async fn`, and much safer. Woe is me.
|
|
||||||
|
|
||||||
let mut me = self.project();
|
|
||||||
loop {
|
|
||||||
let next = {
|
|
||||||
match me.state.as_mut().project() {
|
|
||||||
StateProj::Connecting {
|
|
||||||
connecting,
|
|
||||||
watcher,
|
|
||||||
} => {
|
|
||||||
let res = ready!(connecting.poll(cx));
|
|
||||||
let conn = match res {
|
|
||||||
Ok(conn) => conn,
|
|
||||||
Err(err) => {
|
|
||||||
let err = crate::Error::new_user_make_service(err);
|
|
||||||
debug!("connecting error: {}", err);
|
|
||||||
return Poll::Ready(());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let future = watcher.watch(conn.with_upgrades());
|
|
||||||
State::Connected { future }
|
|
||||||
}
|
|
||||||
StateProj::Connected { future } => {
|
|
||||||
return future.poll(cx).map(|res| {
|
|
||||||
if let Err(err) = res {
|
|
||||||
debug!("connection error: {}", err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
me.state.set(next);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pin_project! {
|
|
||||||
/// A future building a new `Service` to a `Connection`.
|
|
||||||
///
|
|
||||||
/// Wraps the future returned from `MakeService` into one that returns
|
|
||||||
/// a `Connection`.
|
|
||||||
#[must_use = "futures do nothing unless polled"]
|
|
||||||
#[derive(Debug)]
|
|
||||||
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
|
|
||||||
pub struct Connecting<I, F, E = Exec> {
|
|
||||||
#[pin]
|
|
||||||
future: F,
|
|
||||||
io: Option<I>,
|
|
||||||
protocol: Http_<E>,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, F, S, FE, E, B> Future for Connecting<I, F, E>
|
|
||||||
where
|
|
||||||
I: AsyncRead + AsyncWrite + Unpin,
|
|
||||||
F: Future<Output = Result<S, FE>>,
|
|
||||||
S: HttpService<Body, ResBody = B>,
|
|
||||||
B: HttpBody + 'static,
|
|
||||||
B::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
E: ConnStreamExec<S::Future, B>,
|
|
||||||
{
|
|
||||||
type Output = Result<Connection<I, S, E>, FE>;
|
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
|
||||||
let mut me = self.project();
|
|
||||||
let service = ready!(me.future.poll(cx))?;
|
|
||||||
let io = Option::take(&mut me.io).expect("polled after complete");
|
|
||||||
Poll::Ready(Ok(me.protocol.serve_connection(io, service)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,16 +0,0 @@
|
|||||||
use std::fmt;
|
|
||||||
|
|
||||||
use crate::common::exec::Exec;
|
|
||||||
|
|
||||||
/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
|
|
||||||
///
|
|
||||||
/// Needs at least one of the `http1` and `http2` features to be activated to actually be useful.
|
|
||||||
pub struct Server<I, S, E = Exec> {
|
|
||||||
_marker: std::marker::PhantomData<(I, S, E)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
f.debug_struct("Server").finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,128 +0,0 @@
|
|||||||
use std::error::Error as StdError;
|
|
||||||
|
|
||||||
use pin_project_lite::pin_project;
|
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
|
||||||
use tracing::debug;
|
|
||||||
|
|
||||||
use super::accept::Accept;
|
|
||||||
use super::conn::UpgradeableConnection;
|
|
||||||
use super::server::{Server, Watcher};
|
|
||||||
use crate::body::{Body, HttpBody};
|
|
||||||
use crate::common::drain::{self, Draining, Signal, Watch, Watching};
|
|
||||||
use crate::common::exec::{ConnStreamExec, NewSvcExec};
|
|
||||||
use crate::common::{task, Future, Pin, Poll, Unpin};
|
|
||||||
use crate::service::{HttpService, MakeServiceRef};
|
|
||||||
|
|
||||||
pin_project! {
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
pub struct Graceful<I, S, F, E> {
|
|
||||||
#[pin]
|
|
||||||
state: State<I, S, F, E>,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pin_project! {
|
|
||||||
#[project = StateProj]
|
|
||||||
pub(super) enum State<I, S, F, E> {
|
|
||||||
Running {
|
|
||||||
drain: Option<(Signal, Watch)>,
|
|
||||||
#[pin]
|
|
||||||
server: Server<I, S, E>,
|
|
||||||
#[pin]
|
|
||||||
signal: F,
|
|
||||||
},
|
|
||||||
Draining { draining: Draining },
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, S, F, E> Graceful<I, S, F, E> {
|
|
||||||
pub(super) fn new(server: Server<I, S, E>, signal: F) -> Self {
|
|
||||||
let drain = Some(drain::channel());
|
|
||||||
Graceful {
|
|
||||||
state: State::Running {
|
|
||||||
drain,
|
|
||||||
server,
|
|
||||||
signal,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, IO, IE, S, B, F, E> Future for Graceful<I, S, F, E>
|
|
||||||
where
|
|
||||||
I: Accept<Conn = IO, Error = IE>,
|
|
||||||
IE: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
||||||
S: MakeServiceRef<IO, Body, ResBody = B>,
|
|
||||||
S::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
B: HttpBody + 'static,
|
|
||||||
B::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
F: Future<Output = ()>,
|
|
||||||
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
|
|
||||||
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
|
|
||||||
{
|
|
||||||
type Output = crate::Result<()>;
|
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
|
||||||
let mut me = self.project();
|
|
||||||
loop {
|
|
||||||
let next = {
|
|
||||||
match me.state.as_mut().project() {
|
|
||||||
StateProj::Running {
|
|
||||||
drain,
|
|
||||||
server,
|
|
||||||
signal,
|
|
||||||
} => match signal.poll(cx) {
|
|
||||||
Poll::Ready(()) => {
|
|
||||||
debug!("signal received, starting graceful shutdown");
|
|
||||||
let sig = drain.take().expect("drain channel").0;
|
|
||||||
State::Draining {
|
|
||||||
draining: sig.drain(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Poll::Pending => {
|
|
||||||
let watch = drain.as_ref().expect("drain channel").1.clone();
|
|
||||||
return server.poll_watch(cx, &GracefulWatcher(watch));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
StateProj::Draining { ref mut draining } => {
|
|
||||||
return Pin::new(draining).poll(cx).map(Ok);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
me.state.set(next);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct GracefulWatcher(Watch);
|
|
||||||
|
|
||||||
impl<I, S, E> Watcher<I, S, E> for GracefulWatcher
|
|
||||||
where
|
|
||||||
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
||||||
S: HttpService<Body>,
|
|
||||||
E: ConnStreamExec<S::Future, S::ResBody>,
|
|
||||||
S::ResBody: 'static,
|
|
||||||
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
{
|
|
||||||
type Future =
|
|
||||||
Watching<UpgradeableConnection<I, S, E>, fn(Pin<&mut UpgradeableConnection<I, S, E>>)>;
|
|
||||||
|
|
||||||
fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future {
|
|
||||||
self.0.clone().watch(conn, on_drain)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_drain<I, S, E>(conn: Pin<&mut UpgradeableConnection<I, S, E>>)
|
|
||||||
where
|
|
||||||
S: HttpService<Body>,
|
|
||||||
S::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
I: AsyncRead + AsyncWrite + Unpin,
|
|
||||||
S::ResBody: HttpBody + 'static,
|
|
||||||
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
E: ConnStreamExec<S::Future, S::ResBody>,
|
|
||||||
{
|
|
||||||
conn.graceful_shutdown()
|
|
||||||
}
|
|
||||||
@@ -1,10 +1,6 @@
|
|||||||
use std::error::Error as StdError;
|
|
||||||
use std::fmt;
|
|
||||||
|
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
use super::{HttpService, Service};
|
use super::Service;
|
||||||
use crate::body::HttpBody;
|
|
||||||
use crate::common::{task, Future, Poll};
|
use crate::common::{task, Future, Poll};
|
||||||
|
|
||||||
// The same "trait alias" as tower::MakeConnection, but inlined to reduce
|
// The same "trait alias" as tower::MakeConnection, but inlined to reduce
|
||||||
@@ -38,115 +34,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Just a sort-of "trait alias" of `MakeService`, not to be implemented
|
|
||||||
// by anyone, only used as bounds.
|
|
||||||
pub trait MakeServiceRef<Target, ReqBody>: self::sealed::Sealed<(Target, ReqBody)> {
|
|
||||||
type ResBody: HttpBody;
|
|
||||||
type Error: Into<Box<dyn StdError + Send + Sync>>;
|
|
||||||
type Service: HttpService<ReqBody, ResBody = Self::ResBody, Error = Self::Error>;
|
|
||||||
type MakeError: Into<Box<dyn StdError + Send + Sync>>;
|
|
||||||
type Future: Future<Output = Result<Self::Service, Self::MakeError>>;
|
|
||||||
|
|
||||||
// Acting like a #[non_exhaustive] for associated types of this trait.
|
|
||||||
//
|
|
||||||
// Basically, no one outside of hyper should be able to set this type
|
|
||||||
// or declare bounds on it, so it should prevent people from creating
|
|
||||||
// trait objects or otherwise writing code that requires using *all*
|
|
||||||
// of the associated types.
|
|
||||||
//
|
|
||||||
// Why? So we can add new associated types to this alias in the future,
|
|
||||||
// if necessary.
|
|
||||||
type __DontNameMe: self::sealed::CantImpl;
|
|
||||||
|
|
||||||
fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::MakeError>>;
|
|
||||||
|
|
||||||
fn make_service_ref(&mut self, target: &Target) -> Self::Future;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, Target, E, ME, S, F, IB, OB> MakeServiceRef<Target, IB> for T
|
|
||||||
where
|
|
||||||
T: for<'a> Service<&'a Target, Error = ME, Response = S, Future = F>,
|
|
||||||
E: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
ME: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
S: HttpService<IB, ResBody = OB, Error = E>,
|
|
||||||
F: Future<Output = Result<S, ME>>,
|
|
||||||
IB: HttpBody,
|
|
||||||
OB: HttpBody,
|
|
||||||
{
|
|
||||||
type Error = E;
|
|
||||||
type Service = S;
|
|
||||||
type ResBody = OB;
|
|
||||||
type MakeError = ME;
|
|
||||||
type Future = F;
|
|
||||||
|
|
||||||
type __DontNameMe = self::sealed::CantName;
|
|
||||||
|
|
||||||
fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::MakeError>> {
|
|
||||||
self.poll_ready(cx)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_service_ref(&mut self, target: &Target) -> Self::Future {
|
|
||||||
self.call(target)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, Target, S, B1, B2> self::sealed::Sealed<(Target, B1)> for T
|
|
||||||
where
|
|
||||||
T: for<'a> Service<&'a Target, Response = S>,
|
|
||||||
S: HttpService<B1, ResBody = B2>,
|
|
||||||
B1: HttpBody,
|
|
||||||
B2: HttpBody,
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a `MakeService` from a function.
|
|
||||||
pub fn make_service_fn<F, Target, Ret>(f: F) -> MakeServiceFn<F>
|
|
||||||
where
|
|
||||||
F: FnMut(&Target) -> Ret,
|
|
||||||
Ret: Future,
|
|
||||||
{
|
|
||||||
MakeServiceFn { f }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// `MakeService` returned from [`make_service_fn`]
|
|
||||||
#[derive(Clone, Copy)]
|
|
||||||
pub struct MakeServiceFn<F> {
|
|
||||||
f: F,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'t, F, Ret, Target, Svc, MkErr> Service<&'t Target> for MakeServiceFn<F>
|
|
||||||
where
|
|
||||||
F: FnMut(&Target) -> Ret,
|
|
||||||
Ret: Future<Output = Result<Svc, MkErr>>,
|
|
||||||
MkErr: Into<Box<dyn StdError + Send + Sync>>,
|
|
||||||
{
|
|
||||||
type Error = MkErr;
|
|
||||||
type Response = Svc;
|
|
||||||
type Future = Ret;
|
|
||||||
|
|
||||||
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn call(&mut self, target: &'t Target) -> Self::Future {
|
|
||||||
(self.f)(target)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F> fmt::Debug for MakeServiceFn<F> {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
f.debug_struct("MakeServiceFn").finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mod sealed {
|
mod sealed {
|
||||||
pub trait Sealed<X> {}
|
pub trait Sealed<X> {}
|
||||||
|
|
||||||
#[allow(unreachable_pub)] // This is intentional.
|
|
||||||
pub trait CantImpl {}
|
|
||||||
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
pub enum CantName {}
|
|
||||||
|
|
||||||
impl CantImpl for CantName {}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,8 +10,6 @@
|
|||||||
//!
|
//!
|
||||||
//! - `HttpService`: This is blanketly implemented for all types that
|
//! - `HttpService`: This is blanketly implemented for all types that
|
||||||
//! implement `Service<http::Request<B1>, Response = http::Response<B2>>`.
|
//! implement `Service<http::Request<B1>, Response = http::Response<B2>>`.
|
||||||
//! - `MakeService`: When a `Service` returns a new `Service` as its "response",
|
|
||||||
//! we consider it a `MakeService`. Again, blanketly implemented in those cases.
|
|
||||||
//! - `MakeConnection`: A `Service` that returns a "connection", a type that
|
//! - `MakeConnection`: A `Service` that returns a "connection", a type that
|
||||||
//! implements `AsyncRead` and `AsyncWrite`.
|
//! implements `AsyncRead` and `AsyncWrite`.
|
||||||
//!
|
//!
|
||||||
@@ -24,16 +22,6 @@
|
|||||||
//! The helper [`service_fn`](service_fn) should be sufficient for most cases, but
|
//! The helper [`service_fn`](service_fn) should be sufficient for most cases, but
|
||||||
//! if you need to implement `Service` for a type manually, you can follow the example
|
//! if you need to implement `Service` for a type manually, you can follow the example
|
||||||
//! in `service_struct_impl.rs`.
|
//! in `service_struct_impl.rs`.
|
||||||
//!
|
|
||||||
//! # MakeService
|
|
||||||
//!
|
|
||||||
//! Since a `Service` is bound to a single connection, a [`Server`](crate::Server)
|
|
||||||
//! needs a way to make them as it accepts connections. This is what a
|
|
||||||
//! `MakeService` does.
|
|
||||||
//!
|
|
||||||
//! Resources that need to be shared by all `Service`s can be put into a
|
|
||||||
//! `MakeService`, and then passed to individual `Service`s when `call`
|
|
||||||
//! is called.
|
|
||||||
|
|
||||||
pub use tower_service::Service;
|
pub use tower_service::Service;
|
||||||
|
|
||||||
@@ -43,13 +31,11 @@ mod make;
|
|||||||
mod oneshot;
|
mod oneshot;
|
||||||
mod util;
|
mod util;
|
||||||
|
|
||||||
|
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))]
|
||||||
pub(super) use self::http::HttpService;
|
pub(super) use self::http::HttpService;
|
||||||
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "client"))]
|
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "client"))]
|
||||||
pub(super) use self::make::MakeConnection;
|
pub(super) use self::make::MakeConnection;
|
||||||
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))]
|
|
||||||
pub(super) use self::make::MakeServiceRef;
|
|
||||||
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "client"))]
|
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "client"))]
|
||||||
pub(super) use self::oneshot::{oneshot, Oneshot};
|
pub(super) use self::oneshot::{oneshot, Oneshot};
|
||||||
|
|
||||||
pub use self::make::make_service_fn;
|
|
||||||
pub use self::util::service_fn;
|
pub use self::util::service_fn;
|
||||||
|
|||||||
Reference in New Issue
Block a user