feat(server): re-design Server as higher-level API

The `hyper::Server` is now a proper higher-level API for running HTTP
servers. There is a related `hyper::server::Builder` type, to construct
a `Server`. All other types (`Http`, `Serve`, etc) were moved into the
"lower-level" `hyper::server::conn` module.

The `Server` is a `Future` representing a listening HTTP server. Options
needed to build one are set on the `Builder`.

As `Server` is just a `Future`, it no longer owns a thread-blocking
executor, and can thus be run next to other servers, clients, or
what-have-you.

Closes #1322
Closes #1263

BREAKING CHANGE: The `Server` is no longer created from `Http::bind`,
  nor is it `run`. It is a `Future` that must be polled by an
  `Executor`.

  The `hyper::server::Http` type has move to
  `hyper::server::conn::Http`.
This commit is contained in:
Sean McArthur
2018-04-16 11:57:50 -07:00
parent 35c38cba6e
commit c4974500ab
16 changed files with 798 additions and 833 deletions

View File

@@ -13,7 +13,8 @@ use tokio::runtime::Runtime;
use tokio::net::TcpListener;
use hyper::{Body, Method, Request, Response};
use hyper::server::Http;
use hyper::client::HttpConnector;
use hyper::server::conn::Http;
#[bench]
@@ -21,8 +22,10 @@ fn get_one_at_a_time(b: &mut test::Bencher) {
let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt);
let client = hyper::Client::configure()
.build_with_executor(&rt.reactor(), rt.executor());
let connector = HttpConnector::new_with_handle(1, rt.reactor().clone());
let client = hyper::Client::builder()
.executor(rt.executor())
.build::<_, Body>(connector);
let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap();
@@ -43,8 +46,10 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt);
let client = hyper::Client::configure()
.build_with_executor(&rt.reactor(), rt.executor());
let connector = HttpConnector::new_with_handle(1, rt.reactor().clone());
let client = hyper::Client::builder()
.executor(rt.executor())
.build::<_, Body>(connector);
let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap();
@@ -71,7 +76,7 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
let listener = TcpListener::bind(&addr).unwrap();
let addr = listener.local_addr().unwrap();
let http = Http::<hyper::Chunk>::new();
let http = Http::new();
let service = const_service(service_fn(|req: Request<Body>| {
req.into_body()
@@ -81,6 +86,7 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
})
}));
// Specifically only accept 1 connection.
let srv = listener.incoming()
.into_future()
.map_err(|(e, _inc)| panic!("accept error: {}", e))

View File

@@ -14,24 +14,28 @@ use std::sync::mpsc;
use futures::{future, stream, Future, Stream};
use futures::sync::oneshot;
use hyper::{Body, Request, Response};
use hyper::{Body, Request, Response, Server};
use hyper::server::Service;
macro_rules! bench_server {
($b:ident, $header:expr, $body:expr) => ({
let _ = pretty_env_logger::try_init();
let (_until_tx, until_rx) = oneshot::channel();
let (_until_tx, until_rx) = oneshot::channel::<()>();
let addr = {
let (addr_tx, addr_rx) = mpsc::channel();
::std::thread::spawn(move || {
let addr = "127.0.0.1:0".parse().unwrap();
let srv = hyper::server::Http::new().bind(&addr, || Ok(BenchPayload {
header: $header,
body: $body,
})).unwrap();
let addr = srv.local_addr().unwrap();
addr_tx.send(addr).unwrap();
tokio::run(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
let srv = Server::bind(&addr)
.serve(|| Ok(BenchPayload {
header: $header,
body: $body,
}));
addr_tx.send(srv.local_addr()).unwrap();
let fut = srv
.map_err(|e| panic!("server error: {}", e))
.select(until_rx.then(|_| Ok(())))
.then(|_| Ok(()));
tokio::run(fut);
});
addr_rx.recv().unwrap()

View File

@@ -5,15 +5,15 @@ extern crate pretty_env_logger;
extern crate tokio;
use futures::Future;
use futures::future::lazy;
use hyper::{Body, Response};
use hyper::server::{Http, const_service, service_fn};
use hyper::server::{Server, const_service, service_fn};
static PHRASE: &'static [u8] = b"Hello World!";
fn main() {
pretty_env_logger::init();
let addr = ([127, 0, 0, 1], 3000).into();
let new_service = const_service(service_fn(|_| {
@@ -21,13 +21,11 @@ fn main() {
Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE)))
}));
tokio::run(lazy(move || {
let server = Http::new()
.sleep_on_errors(true)
.bind(&addr, new_service)
.unwrap();
let server = Server::bind(&addr)
.serve(new_service)
.map_err(|e| eprintln!("server error: {}", e));
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
println!("Listening on http://{}", addr);
tokio::run(server);
}

View File

@@ -4,11 +4,11 @@ extern crate futures;
extern crate pretty_env_logger;
extern crate tokio;
use futures::{Future, Stream};
use futures::{Future};
use futures::future::{FutureResult, lazy};
use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};
static INDEX1: &'static [u8] = b"The 1st service!";
static INDEX2: &'static [u8] = b"The 2nd service!";
@@ -40,25 +40,23 @@ impl Service for Srv {
fn main() {
pretty_env_logger::init();
let addr1 = "127.0.0.1:1337".parse().unwrap();
let addr2 = "127.0.0.1:1338".parse().unwrap();
let addr1 = ([127, 0, 0, 1], 1337).into();
let addr2 = ([127, 0, 0, 1], 1338).into();
tokio::run(lazy(move || {
let srv1 = Http::new().serve_addr(&addr1, || Ok(Srv(INDEX1))).unwrap();
let srv2 = Http::new().serve_addr(&addr2, || Ok(Srv(INDEX2))).unwrap();
let srv1 = Server::bind(&addr1)
.serve(|| Ok(Srv(INDEX1)))
.map_err(|e| eprintln!("server 1 error: {}", e));
println!("Listening on http://{}", srv1.incoming_ref().local_addr());
println!("Listening on http://{}", srv2.incoming_ref().local_addr());
let srv2 = Server::bind(&addr2)
.serve(|| Ok(Srv(INDEX2)))
.map_err(|e| eprintln!("server 2 error: {}", e));
tokio::spawn(srv1.for_each(move |conn| {
tokio::spawn(conn.map_err(|err| println!("srv1 error: {:?}", err)));
Ok(())
}).map_err(|_| ()));
println!("Listening on http://{} and http://{}", addr1, addr2);
tokio::spawn(srv2.for_each(move |conn| {
tokio::spawn(conn.map_err(|err| println!("srv2 error: {:?}", err)));
Ok(())
}).map_err(|_| ()));
tokio::spawn(srv1);
tokio::spawn(srv2);
Ok(())
}));

View File

@@ -6,10 +6,9 @@ extern crate tokio;
extern crate url;
use futures::{Future, Stream};
use futures::future::lazy;
use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};
use std::collections::HashMap;
use url::form_urlencoded;
@@ -96,11 +95,12 @@ impl Service for ParamExample {
fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
let addr = ([127, 0, 0, 1], 1337).into();
let server = Server::bind(&addr)
.serve(|| Ok(ParamExample))
.map_err(|e| eprintln!("server error: {}", e));
tokio::run(server);
}

View File

@@ -5,11 +5,10 @@ extern crate pretty_env_logger;
extern crate tokio;
use futures::{Future/*, Sink*/};
use futures::future::lazy;
use futures::sync::oneshot;
use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};
use std::fs::File;
use std::io::{self, copy/*, Read*/};
@@ -138,11 +137,14 @@ impl Service for ResponseExamples {
fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
let server = Server::bind(&addr)
.serve(|| Ok(ResponseExamples))
.map_err(|e| eprintln!("server error: {}", e));
println!("Listening on http://{}", addr);
tokio::run(server);
}

View File

@@ -5,10 +5,10 @@ extern crate pretty_env_logger;
extern crate tokio;
use futures::Future;
use futures::future::{FutureResult, lazy};
use futures::future::{FutureResult};
use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};
static INDEX: &'static [u8] = b"Try POST /echo";
@@ -41,11 +41,14 @@ impl Service for Echo {
fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
let addr = ([127, 0, 0, 1], 1337).into();
let server = Server::bind(&addr)
.serve(|| Ok(Echo))
.map_err(|e| eprintln!("server error: {}", e));
println!("Listening on http://{}", addr);
tokio::run(server);
}

View File

@@ -9,7 +9,7 @@ use futures::future::lazy;
use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode};
use hyper::client::HttpConnector;
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};
#[allow(unused, deprecated)]
use std::ascii::AsciiExt;
@@ -75,15 +75,17 @@ impl Service for ResponseExamples {
fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
tokio::run(lazy(move || {
let client = Client::new();
let serve = Http::new().serve_addr(&addr, move || Ok(ResponseExamples(client.clone()))).unwrap();
println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());
let server = Server::bind(&addr)
.serve(move || Ok(ResponseExamples(client.clone())))
.map_err(|e| eprintln!("server error: {}", e));
serve.map_err(|_| ()).for_each(move |conn| {
tokio::spawn(conn.map_err(|err| println!("serve error: {:?}", err)))
})
println!("Listening on http://{}", addr);
server
}));
}

View File

@@ -171,12 +171,12 @@ impl Error {
Error::new(Kind::Io, Some(cause.into()))
}
pub(crate) fn new_listen(err: io::Error) -> Error {
Error::new(Kind::Listen, Some(err.into()))
pub(crate) fn new_listen<E: Into<Cause>>(cause: E) -> Error {
Error::new(Kind::Listen, Some(cause.into()))
}
pub(crate) fn new_accept(err: io::Error) -> Error {
Error::new(Kind::Accept, Some(Box::new(err)))
pub(crate) fn new_accept<E: Into<Cause>>(cause: E) -> Error {
Error::new(Kind::Accept, Some(cause.into()))
}
pub(crate) fn new_connect<E: Into<Cause>>(cause: E) -> Error {

View File

@@ -10,8 +10,19 @@ use tokio_io::{AsyncRead, AsyncWrite};
use proto::{Http1Transaction, MessageHead};
const INIT_BUFFER_SIZE: usize = 8192;
pub const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
/// The initial buffer size allocated before trying to read from IO.
pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
/// The default maximum read buffer size. If the buffer gets this big and
/// a message is still not complete, a `TooLarge` error is triggered.
// Note: if this changes, update server::conn::Http::max_buf_size docs.
pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
/// The maximum number of distinct `Buf`s to hold in a list before requiring
/// a flush. Only affects when the buffer strategy is to queue buffers.
///
/// Note that a flush can happen before reaching the maximum. This simply
/// forces a flush if the queue gets this big.
const MAX_BUF_LIST_BUFFERS: usize = 16;
pub struct Buffered<T, B> {

View File

@@ -5,19 +5,59 @@
//! are not handled at this level. This module provides the building blocks to
//! customize those things externally.
//!
//! If don't have need to manage connections yourself, consider using the
//! If you don't have need to manage connections yourself, consider using the
//! higher-level [Server](super) API.
use std::fmt;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use futures::{Future, Poll};
use futures::future::{Either};
use futures::{Async, Future, Poll, Stream};
use futures::future::{Either, Executor};
use tokio_io::{AsyncRead, AsyncWrite};
//TODO: change these tokio:: to sub-crates
use tokio::reactor::Handle;
use common::Exec;
use proto;
use body::{Body, Payload};
use super::{HyperService, Request, Response, Service};
use super::{HyperService, NewService, Request, Response, Service};
pub use super::tcp::AddrIncoming;
/// A lower-level configuration of the HTTP protocol.
///
/// This structure is used to configure options for an HTTP server connection.
///
/// If don't have need to manage connections yourself, consider using the
/// higher-level [Server](super) API.
#[derive(Clone, Debug)]
pub struct Http {
exec: Exec,
http2: bool,
keep_alive: bool,
max_buf_size: Option<usize>,
pipeline_flush: bool,
}
/// A stream mapping incoming IOs to new services.
///
/// Yields `Connection`s that are futures that should be put on a reactor.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Serve<I, S> {
incoming: I,
new_service: S,
protocol: Http,
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub(super) struct SpawnAll<I, S> {
serve: Serve<I, S>,
}
/// A future binding a connection with a Service.
///
@@ -63,6 +103,187 @@ pub struct Parts<T> {
_inner: (),
}
// ===== impl Http =====
impl Http {
/// Creates a new instance of the HTTP protocol, ready to spawn a server or
/// start accepting connections.
pub fn new() -> Http {
Http {
exec: Exec::Default,
http2: false,
keep_alive: true,
max_buf_size: None,
pipeline_flush: false,
}
}
/// Sets whether HTTP2 is required.
///
/// Default is false
pub fn http2_only(&mut self, val: bool) -> &mut Self {
self.http2 = val;
self
}
/// Enables or disables HTTP keep-alive.
///
/// Default is true.
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
self.keep_alive = val;
self
}
/// Set the maximum buffer size for the connection.
///
/// Default is ~400kb.
pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
self.max_buf_size = Some(max);
self
}
/// Aggregates flushes to better support pipelined responses.
///
/// Experimental, may be have bugs.
///
/// Default is false.
pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
self.pipeline_flush = enabled;
self
}
/// Set the executor used to spawn background tasks.
///
/// Default uses implicit default (like `tokio::spawn`).
pub fn executor<E>(&mut self, exec: E) -> &mut Self
where
E: Executor<Box<Future<Item=(), Error=()> + Send>> + Send + Sync + 'static
{
self.exec = Exec::Executor(Arc::new(exec));
self
}
/// Bind a connection together with a `Service`.
///
/// This returns a Future that must be polled in order for HTTP to be
/// driven on the connection.
///
/// # Example
///
/// ```
/// # extern crate futures;
/// # extern crate hyper;
/// # extern crate tokio;
/// # extern crate tokio_io;
/// # use futures::Future;
/// # use hyper::{Body, Request, Response};
/// # use hyper::server::Service;
/// # use hyper::server::conn::Http;
/// # use tokio_io::{AsyncRead, AsyncWrite};
/// # use tokio::reactor::Handle;
/// # fn run<I, S>(some_io: I, some_service: S)
/// # where
/// # I: AsyncRead + AsyncWrite + Send + 'static,
/// # S: Service<Request=Request<Body>, Response=Response<Body>, Error=hyper::Error> + Send + 'static,
/// # S::Future: Send
/// # {
/// let http = Http::new();
/// let conn = http.serve_connection(some_io, some_service);
///
/// let fut = conn.map_err(|e| {
/// eprintln!("server connection error: {}", e);
/// });
///
/// tokio::spawn(fut);
/// # }
/// # fn main() {}
/// ```
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S>
where
S: Service<Request = Request<Body>, Response = Response<Bd>>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Future: Send + 'static,
Bd: Payload,
I: AsyncRead + AsyncWrite,
{
let either = if !self.http2 {
let mut conn = proto::Conn::new(io);
if !self.keep_alive {
conn.disable_keep_alive();
}
conn.set_flush_pipeline(self.pipeline_flush);
if let Some(max) = self.max_buf_size {
conn.set_max_buf_size(max);
}
let sd = proto::h1::dispatch::Server::new(service);
Either::A(proto::h1::Dispatcher::new(sd, conn))
} else {
let h2 = proto::h2::Server::new(io, service, self.exec.clone());
Either::B(h2)
};
Connection {
conn: either,
}
}
/// Bind the provided `addr` with the default `Handle` and return [`Serve`](Serve).
///
/// This method will bind the `addr` provided with a new TCP listener ready
/// to accept connections. Each connection will be processed with the
/// `new_service` object provided, creating a new service per
/// connection.
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
where
S: NewService<Request=Request<Body>, Response=Response<Bd>>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
{
let mut incoming = AddrIncoming::new(addr, None)?;
if self.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
Ok(self.serve_incoming(incoming, new_service))
}
/// Bind the provided `addr` with the `Handle` and return a [`Serve`](Serve)
///
/// This method will bind the `addr` provided with a new TCP listener ready
/// to accept connections. Each connection will be processed with the
/// `new_service` object provided, creating a new service per
/// connection.
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
where
S: NewService<Request = Request<Body>, Response = Response<Bd>>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
{
let mut incoming = AddrIncoming::new(addr, Some(handle))?;
if self.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
Ok(self.serve_incoming(incoming, new_service))
}
/// Bind the provided stream of incoming IO objects with a `NewService`.
pub fn serve_incoming<I, S, Bd>(&self, incoming: I, new_service: S) -> Serve<I, S>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite,
S: NewService<Request = Request<Body>, Response = Response<Bd>>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
{
Serve {
incoming: incoming,
new_service: new_service,
protocol: self.clone(),
}
}
}
// ===== impl Connection =====
impl<I, B, S> Connection<I, S>
@@ -154,3 +375,89 @@ where
}
}
// ===== impl Serve =====
impl<I, S> Serve<I, S> {
/// Spawn all incoming connections onto the executor in `Http`.
pub(super) fn spawn_all(self) -> SpawnAll<I, S> {
SpawnAll {
serve: self,
}
}
/// Get a reference to the incoming stream.
#[inline]
pub fn incoming_ref(&self) -> &I {
&self.incoming
}
/// Get a mutable reference to the incoming stream.
#[inline]
pub fn incoming_mut(&mut self) -> &mut I {
&mut self.incoming
}
}
impl<I, S, B> Stream for Serve<I, S>
where
I: Stream,
I::Item: AsyncRead + AsyncWrite,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
S: NewService<Request=Request<Body>, Response=Response<B>>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
<S::Instance as Service>::Future: Send + 'static,
B: Payload,
{
type Item = Connection<I::Item, S::Instance>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) {
let service = self.new_service.new_service().map_err(::Error::new_user_new_service)?;
Ok(Async::Ready(Some(self.protocol.serve_connection(io, service))))
} else {
Ok(Async::Ready(None))
}
}
}
// ===== impl SpawnAll =====
impl<S> SpawnAll<AddrIncoming, S> {
pub(super) fn local_addr(&self) -> SocketAddr {
self.serve.incoming.local_addr()
}
}
impl<I, S> SpawnAll<I, S> {
pub(super) fn incoming_ref(&self) -> &I {
self.serve.incoming_ref()
}
}
impl<I, S, B> Future for SpawnAll<I, S>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: NewService<Request = Request<Body>, Response = Response<B>> + Send + 'static,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
<S as NewService>::Instance: Send,
<<S as NewService>::Instance as Service>::Future: Send + 'static,
B: Payload,
{
type Item = ();
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
if let Some(conn) = try_ready!(self.serve.poll()) {
let fut = conn
.map_err(|err| debug!("conn error: {}", err));
self.serve.protocol.exec.execute(fut);
} else {
return Ok(Async::Ready(()))
}
}
}
}

View File

@@ -2,797 +2,175 @@
//!
//! A `Server` is created to listen on a port, parse HTTP requests, and hand
//! them off to a `Service`.
//!
//! There are two levels of APIs provide for constructing HTTP servers:
//!
//! - The higher-level [`Server`](Server).
//! - The lower-level [conn](conn) module.
pub mod conn;
mod service;
mod tcp;
use std::fmt;
use std::io;
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::sync::{Arc, Mutex, Weak};
use std::net::SocketAddr;
use std::time::Duration;
use futures::task::{self, Task};
use futures::future::{self, Either, Executor};
use futures::{Future, Stream, Poll, Async};
use futures_timer::Delay;
use futures::{Future, Stream, Poll};
use http::{Request, Response};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::spawn;
use tokio::reactor::Handle;
use tokio::net::TcpListener;
pub use tokio_service::{NewService, Service};
use body::{Body, Payload};
use common::Exec;
use proto;
use self::addr_stream::AddrStream;
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
// error that `hyper::server::Http` is private...
use self::conn::{Http as Http_, SpawnAll};
use self::hyper_service::HyperService;
use self::tcp::{AddrIncoming};
pub use self::conn::Connection;
pub use self::service::{const_service, service_fn};
/// A configuration of the HTTP protocol.
/// A listening HTTP server.
///
/// This structure is used to create instances of `Server` or to spawn off tasks
/// which handle a connection to an HTTP server. Each instance of `Http` can be
/// configured with various protocol-level options such as keepalive.
#[derive(Clone, Debug)]
pub struct Http {
exec: Exec,
http2: bool,
keep_alive: bool,
max_buf_size: Option<usize>,
pipeline: bool,
sleep_on_errors: bool,
/// `Server` is a `Future` mapping a bound listener with a set of service
/// handlers. It is built using the [`Builder`](Builder), and the future
/// completes when the server has been shutdown. It should be run by an
/// `Executor`.
pub struct Server<I, S> {
spawn_all: SpawnAll<I, S>,
}
/// An instance of a server created through `Http::bind`.
///
/// This server is intended as a convenience for creating a TCP listener on an
/// address and then serving TCP connections accepted with the service provided.
pub struct Server<S> {
protocol: Http,
new_service: S,
handle: Handle,
listener: TcpListener,
shutdown_timeout: Duration,
}
/// A stream mapping incoming IOs to new services.
///
/// Yields `Connection`s that are futures that should be put on a reactor.
#[must_use = "streams do nothing unless polled"]
/// A builder for a [`Server`](Server).
#[derive(Debug)]
pub struct Serve<I, S> {
pub struct Builder<I> {
incoming: I,
new_service: S,
protocol: Http,
protocol: Http_,
}
/*
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct SpawnAll<I, S, E> {
executor: E,
serve: Serve<I, S>,
}
*/
/// A stream of connections from binding to an address.
#[must_use = "streams do nothing unless polled"]
pub struct AddrIncoming {
addr: SocketAddr,
keep_alive_timeout: Option<Duration>,
listener: TcpListener,
handle: Handle,
sleep_on_errors: bool,
timeout: Option<Delay>,
}
impl fmt::Debug for AddrIncoming {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("AddrIncoming")
.field("addr", &self.addr)
.field("keep_alive_timeout", &self.keep_alive_timeout)
.field("listener", &self.listener)
.field("handle", &self.handle)
.field("sleep_on_errors", &self.sleep_on_errors)
.finish()
}
}
// ===== impl Http =====
impl Http {
/// Creates a new instance of the HTTP protocol, ready to spawn a server or
/// start accepting connections.
pub fn new() -> Http {
Http {
exec: Exec::Default,
http2: false,
keep_alive: true,
max_buf_size: None,
pipeline: false,
sleep_on_errors: false,
}
}
/// Sets whether HTTP2 is required.
///
/// Default is false
pub fn http2_only(&mut self, val: bool) -> &mut Self {
self.http2 = val;
self
}
/// Enables or disables HTTP keep-alive.
///
/// Default is true.
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
self.keep_alive = val;
self
}
/// Set the maximum buffer size for the connection.
pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
self.max_buf_size = Some(max);
self
}
/// Aggregates flushes to better support pipelined responses.
///
/// Experimental, may be have bugs.
///
/// Default is false.
pub fn pipeline(&mut self, enabled: bool) -> &mut Self {
self.pipeline = enabled;
self
}
/// Set the executor used to spawn background tasks.
///
/// Default uses implicit default (like `tokio::spawn`).
pub fn executor<E>(&mut self, exec: E) -> &mut Self
where
E: Executor<Box<Future<Item=(), Error=()> + Send>> + Send + Sync + 'static
{
self.exec = Exec::Executor(Arc::new(exec));
self
}
/// Swallow connection accept errors. Instead of passing up IO errors when
/// the server is under heavy load the errors will be ignored. Some
/// connection accept errors (like "connection reset") can be ignored, some
/// (like "too many files open") may consume 100% CPU and a timout of 10ms
/// is used in that case.
///
/// Default is false.
pub fn sleep_on_errors(&mut self, enabled: bool) -> &mut Self {
self.sleep_on_errors = enabled;
self
}
/// Bind the provided `addr` and return a server ready to handle
/// connections.
///
/// This method will bind the `addr` provided with a new TCP listener ready
/// to accept connections. Each connection will be processed with the
/// `new_service` object provided as well, creating a new service per
/// connection.
///
/// The returned `Server` contains one method, `run`, which is used to
/// actually run the server.
pub fn bind<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Server<S>>
where
S: NewService<Request=Request<Body>, Response=Response<Bd>> + 'static,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
{
let handle = Handle::current();
let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?;
let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?;
Ok(Server {
new_service: new_service,
handle: handle,
listener: listener,
protocol: self.clone(),
shutdown_timeout: Duration::new(1, 0),
})
}
/// Bind the provided `addr` and return a server with the default `Handle`.
///
/// This is method will bind the `addr` provided with a new TCP listener ready
/// to accept connections. Each connection will be processed with the
/// `new_service` object provided as well, creating a new service per
/// connection.
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
where
S: NewService<Request=Request<Body>, Response=Response<Bd>>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
{
let handle = Handle::current();
let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?;
let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?;
let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors).map_err(::Error::new_listen)?;
if self.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
Ok(self.serve_incoming(incoming, new_service))
}
/// Bind the provided `addr` and return a server with a shared `Core`.
///
/// This method allows the ability to share a `Core` with multiple servers.
///
/// This is method will bind the `addr` provided with a new TCP listener ready
/// to accept connections. Each connection will be processed with the
/// `new_service` object provided as well, creating a new service per
/// connection.
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
where
S: NewService<Request = Request<Body>, Response = Response<Bd>>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
{
let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?;
let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?;
let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors).map_err(::Error::new_listen)?;
if self.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
Ok(self.serve_incoming(incoming, new_service))
}
/// Bind the provided stream of incoming IO objects with a `NewService`.
///
/// This method allows the ability to share a `Core` with multiple servers.
pub fn serve_incoming<I, S, Bd>(&self, incoming: I, new_service: S) -> Serve<I, S>
where
I: Stream<Error=::std::io::Error>,
I::Item: AsyncRead + AsyncWrite,
S: NewService<Request = Request<Body>, Response = Response<Bd>>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
{
Serve {
incoming: incoming,
new_service: new_service,
protocol: self.clone(),
}
}
/// Bind a connection together with a Service.
///
/// This returns a Future that must be polled in order for HTTP to be
/// driven on the connection.
///
/// # Example
///
/// ```
/// # extern crate futures;
/// # extern crate hyper;
/// # extern crate tokio;
/// # extern crate tokio_io;
/// # use futures::Future;
/// # use hyper::{Body, Request, Response};
/// # use hyper::server::{Http, Service};
/// # use tokio_io::{AsyncRead, AsyncWrite};
/// # use tokio::reactor::Handle;
/// # fn run<I, S>(some_io: I, some_service: S)
/// # where
/// # I: AsyncRead + AsyncWrite + Send + 'static,
/// # S: Service<Request=Request<Body>, Response=Response<Body>, Error=hyper::Error> + Send + 'static,
/// # S::Future: Send
/// # {
/// let http = Http::new();
/// let conn = http.serve_connection(some_io, some_service);
///
/// let fut = conn
/// .map_err(|e| eprintln!("server connection error: {}", e));
///
/// tokio::spawn(fut);
/// # }
/// # fn main() {}
/// ```
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S>
where
S: Service<Request = Request<Body>, Response = Response<Bd>>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Future: Send + 'static,
Bd: Payload,
I: AsyncRead + AsyncWrite,
{
let either = if !self.http2 {
let mut conn = proto::Conn::new(io);
if !self.keep_alive {
conn.disable_keep_alive();
}
conn.set_flush_pipeline(self.pipeline);
if let Some(max) = self.max_buf_size {
conn.set_max_buf_size(max);
}
let sd = proto::h1::dispatch::Server::new(service);
Either::A(proto::h1::Dispatcher::new(sd, conn))
} else {
let h2 = proto::h2::Server::new(io, service, self.exec.clone());
Either::B(h2)
};
Connection {
conn: either,
}
}
}
// ===== impl Server =====
/// TODO: add docs
pub struct Run(Box<Future<Item=(), Error=::Error> + Send + 'static>);
impl fmt::Debug for Run {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Run").finish()
impl<I> Server<I, ()> {
/// Starts a [`Builder`](Builder) with the provided incoming stream.
pub fn builder(incoming: I) -> Builder<I> {
Builder {
incoming,
protocol: Http_::new(),
}
}
}
impl Future for Run {
type Item = ();
type Error = ::Error;
impl Server<AddrIncoming, ()> {
/// Binds to the provided address, and returns a [`Builder`](Builder).
///
/// # Panics
///
/// This method will panic if binding to the address fails. For a method
/// to bind to an address and return a `Result`, see `Server::try_bind`.
pub fn bind(addr: &SocketAddr) -> Builder<AddrIncoming> {
let incoming = AddrIncoming::new(addr, None)
.unwrap_or_else(|e| {
panic!("error binding to {}: {}", addr, e);
});
Server::builder(incoming)
}
fn poll(&mut self) -> Poll<(), ::Error> {
self.0.poll()
/// Tries to bind to the provided address, and returns a [`Builder`](Builder).
pub fn try_bind(addr: &SocketAddr) -> ::Result<Builder<AddrIncoming>> {
AddrIncoming::new(addr, None)
.map(Server::builder)
}
}
impl<S> Server<AddrIncoming, S> {
/// Returns the local address that this server is bound to.
pub fn local_addr(&self) -> SocketAddr {
self.spawn_all.local_addr()
}
}
impl<S, B> Server<S>
impl<I, S, B> Future for Server<I, S>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: NewService<Request = Request<Body>, Response = Response<B>> + Send + 'static,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
<S as NewService>::Instance: Send,
<<S as NewService>::Instance as Service>::Future: Send + 'static,
B: Payload + Send + 'static,
B::Data: Send,
{
/// Returns the local address that this server is bound to.
pub fn local_addr(&self) -> ::Result<SocketAddr> {
//TODO: this shouldn't return an error at all, but should get the
//local_addr at construction
self.listener.local_addr().map_err(::Error::new_io)
}
/// Configure the amount of time this server will wait for a "graceful
/// shutdown".
///
/// This is the amount of time after the shutdown signal is received the
/// server will wait for all pending connections to finish. If the timeout
/// elapses then the server will be forcibly shut down.
///
/// This defaults to 1s.
pub fn shutdown_timeout(&mut self, timeout: Duration) -> &mut Self {
self.shutdown_timeout = timeout;
self
}
/// Execute this server infinitely.
///
/// This method does not currently return, but it will return an error if
/// one occurs.
pub fn run(self) -> Run {
self.run_until(future::empty())
}
/// Execute this server until the given future, `shutdown_signal`, resolves.
///
/// This method, like `run` above, is used to execute this HTTP server. The
/// difference with `run`, however, is that this method allows for shutdown
/// in a graceful fashion. The future provided is interpreted as a signal to
/// shut down the server when it resolves.
///
/// This method will block the current thread executing the HTTP server.
/// When the `shutdown_signal` has resolved then the TCP listener will be
/// unbound (dropped). The thread will continue to block for a maximum of
/// `shutdown_timeout` time waiting for active connections to shut down.
/// Once the `shutdown_timeout` elapses or all active connections are
/// cleaned out then this method will return.
pub fn run_until<F>(self, shutdown_signal: F) -> Run
where F: Future<Item = (), Error = ()> + Send + 'static,
{
let Server { protocol, new_service, handle, listener, shutdown_timeout } = self;
let mut incoming = match AddrIncoming::new(listener, handle.clone(), protocol.sleep_on_errors) {
Ok(incoming) => incoming,
Err(err) => return Run(Box::new(future::err(::Error::new_listen(err)))),
};
if protocol.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
// Mini future to track the number of active services
let info = Arc::new(Mutex::new(Info {
active: 0,
blocker: None,
}));
// Future for our server's execution
let info_cloned = info.clone();
let srv = incoming.for_each(move |socket| {
let addr = socket.remote_addr;
debug!("accepted new connection ({})", addr);
let service = new_service.new_service()?;
let s = NotifyService {
inner: service,
info: Arc::downgrade(&info_cloned),
};
info_cloned.lock().unwrap().active += 1;
let fut = protocol.serve_connection(socket, s)
.map(|_| ())
.map_err(move |err| error!("server connection error: ({}) {}", addr, err));
spawn(fut);
Ok(())
});
// for now, we don't care if the shutdown signal succeeds or errors
// as long as it resolves, we will shutdown.
let shutdown_signal = shutdown_signal.then(|_| Ok(()));
// Main execution of the server. Here we use `select` to wait for either
// `incoming` or `f` to resolve. We know that `incoming` will never
// resolve with a success (it's infinite) so we're actually just waiting
// for an error or for `f`, our shutdown signal.
//
// When we get a shutdown signal (`Ok`) then we drop the TCP listener to
// stop accepting incoming connections.
let main_execution = shutdown_signal.select(srv).then(move |result| {
match result {
Ok(((), _incoming)) => {},
Err((e, _other)) => return future::Either::A(future::err(::Error::new_accept(e))),
}
// Ok we've stopped accepting new connections at this point, but we want
// to give existing connections a chance to clear themselves out. Wait
// at most `shutdown_timeout` time before we just return clearing
// everything out.
//
// Our custom `WaitUntilZero` will resolve once all services constructed
// here have been destroyed.
let timeout = Delay::new(shutdown_timeout);
let wait = WaitUntilZero { info: info.clone() };
future::Either::B(wait.select(timeout).then(|result| {
match result {
Ok(_) => Ok(()),
//TODO: error variant should be "timed out waiting for graceful shutdown"
Err((e, _)) => Err(::Error::new_io(e))
}
}))
});
Run(Box::new(main_execution))
}
}
impl<S: fmt::Debug> fmt::Debug for Server<S>
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Server")
.field("listener", &self.listener)
.field("new_service", &self.new_service)
.field("protocol", &self.protocol)
.finish()
}
}
// ===== impl Serve =====
impl<I, S> Serve<I, S> {
/*
/// Spawn all incoming connections onto the provide executor.
pub fn spawn_all<E>(self, executor: E) -> SpawnAll<I, S, E> {
SpawnAll {
executor: executor,
serve: self,
}
}
*/
/// Get a reference to the incoming stream.
#[inline]
pub fn incoming_ref(&self) -> &I {
&self.incoming
}
}
impl<I, S, B> Stream for Serve<I, S>
where
I: Stream<Error=io::Error>,
I::Item: AsyncRead + AsyncWrite,
S: NewService<Request=Request<Body>, Response=Response<B>>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
<S::Instance as Service>::Future: Send + 'static,
B: Payload,
{
type Item = Connection<I::Item, S::Instance>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) {
let service = self.new_service.new_service().map_err(::Error::new_user_new_service)?;
Ok(Async::Ready(Some(self.protocol.serve_connection(io, service))))
} else {
Ok(Async::Ready(None))
}
}
}
// ===== impl SpawnAll =====
/*
impl<I, S, E> Future for SpawnAll<I, S, E>
where
I: Stream<Error=io::Error>,
I::Item: AsyncRead + AsyncWrite,
S: NewService<Request=Request<Body>, Response=Response<B>, Error=::Error>,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
//E: Executor<Connection<I::Item, S::Instance>>,
{
type Item = ();
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
if let Some(conn) = try_ready!(self.serve.poll()) {
let fut = conn
.map(|_| ())
.map_err(|err| debug!("conn error: {}", err));
match self.executor.execute(fut) {
Ok(()) => (),
Err(err) => match err.kind() {
ExecuteErrorKind::NoCapacity => {
debug!("SpawnAll::poll; executor no capacity");
// continue loop
},
ExecuteErrorKind::Shutdown | _ => {
debug!("SpawnAll::poll; executor shutdown");
return Ok(Async::Ready(()))
}
}
}
} else {
return Ok(Async::Ready(()))
}
}
self.spawn_all.poll()
}
}
*/
// ===== impl AddrIncoming =====
impl AddrIncoming {
fn new(listener: TcpListener, handle: Handle, sleep_on_errors: bool) -> io::Result<AddrIncoming> {
Ok(AddrIncoming {
addr: listener.local_addr()?,
keep_alive_timeout: None,
listener: listener,
handle: handle,
sleep_on_errors: sleep_on_errors,
timeout: None,
})
impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Server")
.field("listener", &self.spawn_all.incoming_ref())
.finish()
}
/// Get the local address bound to this listener.
pub fn local_addr(&self) -> SocketAddr {
self.addr
}
fn set_keepalive(&mut self, dur: Option<Duration>) {
self.keep_alive_timeout = dur;
}
/*
fn set_sleep_on_errors(&mut self, val: bool) {
self.sleep_on_errors = val;
}
*/
}
impl Stream for AddrIncoming {
// currently unnameable...
type Item = AddrStream;
type Error = ::std::io::Error;
// ===== impl Builder =====
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Check if a previous timeout is active that was set by IO errors.
if let Some(ref mut to) = self.timeout {
match to.poll().expect("timeout never fails") {
Async::Ready(_) => {}
Async::NotReady => return Ok(Async::NotReady),
}
impl<I> Builder<I> {
/// Start a new builder, wrapping an incoming stream and low-level options.
///
/// For a more convenient constructor, see [`Server::bind`](Server::bind).
pub fn new(incoming: I, protocol: Http_) -> Self {
Builder {
incoming,
protocol,
}
self.timeout = None;
loop {
match self.listener.poll_accept() {
Ok(Async::Ready((socket, addr))) => {
if let Some(dur) = self.keep_alive_timeout {
if let Err(e) = socket.set_keepalive(Some(dur)) {
trace!("error trying to set TCP keepalive: {}", e);
}
}
return Ok(Async::Ready(Some(AddrStream::new(socket, addr))));
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(ref e) if self.sleep_on_errors => {
// Connection errors can be ignored directly, continue by
// accepting the next request.
if connection_error(e) {
continue;
}
// Sleep 10ms.
let delay = ::std::time::Duration::from_millis(10);
debug!("accept error: {}; sleeping {:?}",
e, delay);
let mut timeout = Delay::new(delay);
let result = timeout.poll()
.expect("timeout never fails");
match result {
Async::Ready(()) => continue,
Async::NotReady => {
self.timeout = Some(timeout);
return Ok(Async::NotReady);
}
}
},
Err(e) => return Err(e),
}
}
/// Sets whether HTTP/2 is required.
///
/// Default is `false`.
pub fn http2_only(mut self, val: bool) -> Self {
self.protocol.http2_only(val);
self
}
/// Consume this `Builder`, creating a [`Server`](Server).
pub fn serve<S, B>(self, new_service: S) -> Server<I, S>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: NewService<Request = Request<Body>, Response = Response<B>>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
<S as NewService>::Instance: Send,
<<S as NewService>::Instance as Service>::Future: Send + 'static,
B: Payload,
{
let serve = self.protocol.serve_incoming(self.incoming, new_service);
let spawn_all = serve.spawn_all();
Server {
spawn_all,
}
}
}
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused ||
e.kind() == io::ErrorKind::ConnectionAborted ||
e.kind() == io::ErrorKind::ConnectionReset
}
mod addr_stream {
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use bytes::{Buf, BufMut};
use futures::Poll;
use tokio::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct AddrStream {
inner: TcpStream,
pub(super) remote_addr: SocketAddr,
impl Builder<AddrIncoming> {
/// Set whether TCP keepalive messages are enabled on accepted connections.
///
/// If `None` is specified, keepalive is disabled, otherwise the duration
/// specified will be the time to remain idle before sending TCP keepalive
/// probes.
pub fn tcp_keepalive(mut self, keepalive: Option<Duration>) -> Self {
self.incoming.set_keepalive(keepalive);
self
}
impl AddrStream {
pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream {
AddrStream {
inner: tcp,
remote_addr: addr,
}
}
}
impl Read for AddrStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl Write for AddrStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
#[inline]
fn flush(&mut self ) -> io::Result<()> {
self.inner.flush()
}
}
impl AsyncRead for AddrStream {
#[inline]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
#[inline]
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.read_buf(buf)
}
}
impl AsyncWrite for AddrStream {
#[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> {
AsyncWrite::shutdown(&mut self.inner)
}
#[inline]
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.write_buf(buf)
}
}
}
// ===== NotifyService =====
struct NotifyService<S> {
inner: S,
info: Weak<Mutex<Info>>,
}
struct WaitUntilZero {
info: Arc<Mutex<Info>>,
}
struct Info {
active: usize,
blocker: Option<Task>,
}
impl<S: Service> Service for NotifyService<S> {
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn call(&self, message: Self::Request) -> Self::Future {
self.inner.call(message)
}
}
impl<S> Drop for NotifyService<S> {
fn drop(&mut self) {
let info = match self.info.upgrade() {
Some(info) => info,
None => return,
};
let mut info = info.lock().unwrap();
info.active -= 1;
if info.active == 0 {
if let Some(task) = info.blocker.take() {
task.notify();
}
}
}
}
impl Future for WaitUntilZero {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
let mut info = self.info.lock().unwrap();
if info.active == 0 {
Ok(().into())
} else {
info.blocker = Some(task::current());
Ok(Async::NotReady)
}
/// Set the value of `TCP_NODELAY` option for accepted connections.
pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
self.incoming.set_nodelay(enabled);
self
}
}

234
src/server/tcp.rs Normal file
View File

@@ -0,0 +1,234 @@
use std::fmt;
use std::io;
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::time::Duration;
use futures::{Async, Future, Poll, Stream};
use futures_timer::Delay;
//TODO: change to tokio_tcp::net::TcpListener
use tokio::net::TcpListener;
use tokio::reactor::Handle;
use self::addr_stream::AddrStream;
/// A stream of connections from binding to an address.
#[must_use = "streams do nothing unless polled"]
pub struct AddrIncoming {
addr: SocketAddr,
listener: TcpListener,
sleep_on_errors: bool,
tcp_keepalive_timeout: Option<Duration>,
tcp_nodelay: bool,
timeout: Option<Delay>,
}
impl AddrIncoming {
pub(super) fn new(addr: &SocketAddr, handle: Option<&Handle>) -> ::Result<AddrIncoming> {
let listener = if let Some(handle) = handle {
let std_listener = StdTcpListener::bind(addr)
.map_err(::Error::new_listen)?;
TcpListener::from_std(std_listener, handle)
.map_err(::Error::new_listen)?
} else {
TcpListener::bind(addr).map_err(::Error::new_listen)?
};
let addr = listener.local_addr().map_err(::Error::new_listen)?;
Ok(AddrIncoming {
addr: addr,
listener: listener,
sleep_on_errors: true,
tcp_keepalive_timeout: None,
tcp_nodelay: false,
timeout: None,
})
}
/// Get the local address bound to this listener.
pub fn local_addr(&self) -> SocketAddr {
self.addr
}
/// Set whether TCP keepalive messages are enabled on accepted connections.
///
/// If `None` is specified, keepalive is disabled, otherwise the duration
/// specified will be the time to remain idle before sending TCP keepalive
/// probes.
pub fn set_keepalive(&mut self, keepalive: Option<Duration>) -> &mut Self {
self.tcp_keepalive_timeout = keepalive;
self
}
/// Set the value of `TCP_NODELAY` option for accepted connections.
pub fn set_nodelay(&mut self, enabled: bool) -> &mut Self {
self.tcp_nodelay = enabled;
self
}
/// Set whether to sleep on accept errors.
///
/// A possible scenario is that the process has hit the max open files
/// allowed, and so trying to accept a new connection will fail with
/// `EMFILE`. In some cases, it's preferable to just wait for some time, if
/// the application will likely close some files (or connections), and try
/// to accept the connection again. If this option is `true`, the error
/// will be logged at the `error` level, since it is still a big deal,
/// and then the listener will sleep for 1 second.
///
/// In other cases, hitting the max open files should be treat similarly
/// to being out-of-memory, and simply error (and shutdown). Setting
/// this option to `false` will allow that.
///
/// Default is `true`.
pub fn set_sleep_on_errors(&mut self, val: bool) {
self.sleep_on_errors = val;
}
}
impl Stream for AddrIncoming {
// currently unnameable...
type Item = AddrStream;
type Error = ::std::io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Check if a previous timeout is active that was set by IO errors.
if let Some(ref mut to) = self.timeout {
match to.poll().expect("timeout never fails") {
Async::Ready(_) => {}
Async::NotReady => return Ok(Async::NotReady),
}
}
self.timeout = None;
loop {
match self.listener.poll_accept() {
Ok(Async::Ready((socket, addr))) => {
if let Some(dur) = self.tcp_keepalive_timeout {
if let Err(e) = socket.set_keepalive(Some(dur)) {
trace!("error trying to set TCP keepalive: {}", e);
}
}
if let Err(e) = socket.set_nodelay(self.tcp_nodelay) {
trace!("error trying to set TCP nodelay: {}", e);
}
return Ok(Async::Ready(Some(AddrStream::new(socket, addr))));
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(ref e) if self.sleep_on_errors => {
// Connection errors can be ignored directly, continue by
// accepting the next request.
if is_connection_error(e) {
debug!("accepted connection already errored: {}", e);
continue;
}
// Sleep 1s.
let delay = Duration::from_secs(1);
error!("accept error: {}", e);
let mut timeout = Delay::new(delay);
let result = timeout.poll()
.expect("timeout never fails");
match result {
Async::Ready(()) => continue,
Async::NotReady => {
self.timeout = Some(timeout);
return Ok(Async::NotReady);
}
}
},
Err(e) => return Err(e),
}
}
}
}
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn is_connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused ||
e.kind() == io::ErrorKind::ConnectionAborted ||
e.kind() == io::ErrorKind::ConnectionReset
}
impl fmt::Debug for AddrIncoming {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("AddrIncoming")
.field("addr", &self.addr)
.field("sleep_on_errors", &self.sleep_on_errors)
.field("tcp_keepalive_timeout", &self.tcp_keepalive_timeout)
.field("tcp_nodelay", &self.tcp_nodelay)
.finish()
}
}
mod addr_stream {
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use bytes::{Buf, BufMut};
use futures::Poll;
use tokio::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct AddrStream {
inner: TcpStream,
pub(super) remote_addr: SocketAddr,
}
impl AddrStream {
pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream {
AddrStream {
inner: tcp,
remote_addr: addr,
}
}
}
impl Read for AddrStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl Write for AddrStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
#[inline]
fn flush(&mut self ) -> io::Result<()> {
self.inner.flush()
}
}
impl AsyncRead for AddrStream {
#[inline]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
#[inline]
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.read_buf(buf)
}
}
impl AsyncWrite for AddrStream {
#[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> {
AsyncWrite::shutdown(&mut self.inner)
}
#[inline]
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.write_buf(buf)
}
}
}

View File

@@ -1,3 +1,4 @@
#![deny(warnings)]
#[macro_use]
mod support;
use self::support::*;

View File

@@ -24,7 +24,6 @@ use futures::future::{self, FutureResult, Either};
use futures::sync::oneshot;
use futures_timer::Delay;
use http::header::{HeaderName, HeaderValue};
//use net2::TcpBuilder;
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
use tokio::reactor::Handle;
@@ -32,7 +31,8 @@ use tokio_io::{AsyncRead, AsyncWrite};
use hyper::{Body, Request, Response, StatusCode};
use hyper::server::{Http, Service, NewService, service_fn};
use hyper::server::{Service, NewService, service_fn};
use hyper::server::conn::Http;
fn tcp_bind(addr: &SocketAddr, handle: &Handle) -> ::tokio::io::Result<TcpListener> {
let std_listener = StdTcpListener::bind(addr).unwrap();
@@ -1363,8 +1363,9 @@ fn serve_with_options(options: ServeOptions) -> Serve {
let (msg_tx, msg_rx) = mpsc::channel();
let (reply_tx, reply_rx) = spmc::channel();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let shutdown_rx = shutdown_rx.then(|_| Ok(()));
let addr = "127.0.0.1:0".parse().unwrap();
let addr = ([127, 0, 0, 1], 0).into();
let keep_alive = !options.keep_alive_disabled;
let pipeline = options.pipeline;
@@ -1372,22 +1373,40 @@ fn serve_with_options(options: ServeOptions) -> Serve {
let thread_name = format!("test-server-{:?}", dur);
let thread = thread::Builder::new().name(thread_name).spawn(move || {
tokio::run(::futures::future::lazy(move || {
let srv = Http::new()
.keep_alive(keep_alive)
.pipeline(pipeline)
.bind(&addr, TestService {
tx: Arc::new(Mutex::new(msg_tx.clone())),
_timeout: dur,
reply: reply_rx,
}).unwrap();
addr_tx.send(srv.local_addr().unwrap()).unwrap();
srv.run_until(shutdown_rx.then(|_| Ok(())))
.map_err(|err| println!("error {}", err))
}))
}).unwrap();
let serve = Http::new()
.keep_alive(keep_alive)
.pipeline_flush(pipeline)
.serve_addr(&addr, TestService {
tx: Arc::new(Mutex::new(msg_tx.clone())),
_timeout: dur,
reply: reply_rx,
})
.expect("bind to address");
let addr = addr_rx.recv().unwrap();
addr_tx.send(
serve
.incoming_ref()
.local_addr()
).expect("server addr tx");
// spawn_all() is private for now, so just duplicate it here
let spawn_all = serve.for_each(|conn| {
tokio::spawn(conn.map_err(|e| {
println!("server error: {}", e);
}));
Ok(())
}).map_err(|e| {
println!("accept error: {}", e)
});
let fut = spawn_all
.select(shutdown_rx)
.then(|_| Ok(()));
tokio::run(fut);
}).expect("thread spawn");
let addr = addr_rx.recv().expect("server addr rx");
Serve {
msg_rx: msg_rx,

View File

@@ -123,9 +123,11 @@ macro_rules! __internal_req_res_prop {
);
(headers: $map:tt) => ({
#[allow(unused_mut)]
{
let mut headers = HeaderMap::new();
__internal_headers!(headers, $map);
headers
}
});
($prop_name:ident: $prop_val:expr) => (
From::from($prop_val)
@@ -228,7 +230,7 @@ pub fn __run_test(cfg: __TestConfig) {
});
let new_service = hyper::server::const_service(service);
let serve = hyper::server::Http::new()
let serve = hyper::server::conn::Http::new()
.http2_only(cfg.server_version == 2)
.executor(rt.executor())
.serve_addr_handle(
@@ -244,7 +246,7 @@ pub fn __run_test(cfg: __TestConfig) {
let (success_tx, success_rx) = oneshot::channel();
let expected_connections = cfg.connections;
let server = serve
.fold(0, move |cnt, conn: hyper::server::Connection<_, _>| {
.fold(0, move |cnt, conn| {
exe.spawn(conn.map_err(|e| panic!("server connection error: {}", e)));
Ok::<_, hyper::Error>(cnt + 1)
})