Merge pull request #1488 from hyperium/server2
feat(server): re-design `Server` as higher-level API
This commit is contained in:
@@ -13,7 +13,8 @@ use tokio::runtime::Runtime;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use hyper::{Body, Method, Request, Response};
|
||||
use hyper::server::Http;
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::server::conn::Http;
|
||||
|
||||
|
||||
#[bench]
|
||||
@@ -21,8 +22,10 @@ fn get_one_at_a_time(b: &mut test::Bencher) {
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
let addr = spawn_hello(&mut rt);
|
||||
|
||||
let client = hyper::Client::configure()
|
||||
.build_with_executor(&rt.reactor(), rt.executor());
|
||||
let connector = HttpConnector::new_with_handle(1, rt.reactor().clone());
|
||||
let client = hyper::Client::builder()
|
||||
.executor(rt.executor())
|
||||
.build::<_, Body>(connector);
|
||||
|
||||
let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap();
|
||||
|
||||
@@ -43,8 +46,10 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
let addr = spawn_hello(&mut rt);
|
||||
|
||||
let client = hyper::Client::configure()
|
||||
.build_with_executor(&rt.reactor(), rt.executor());
|
||||
let connector = HttpConnector::new_with_handle(1, rt.reactor().clone());
|
||||
let client = hyper::Client::builder()
|
||||
.executor(rt.executor())
|
||||
.build::<_, Body>(connector);
|
||||
|
||||
let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap();
|
||||
|
||||
@@ -71,7 +76,7 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
|
||||
let listener = TcpListener::bind(&addr).unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
|
||||
let http = Http::<hyper::Chunk>::new();
|
||||
let http = Http::new();
|
||||
|
||||
let service = const_service(service_fn(|req: Request<Body>| {
|
||||
req.into_body()
|
||||
@@ -81,6 +86,7 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
|
||||
})
|
||||
}));
|
||||
|
||||
// Specifically only accept 1 connection.
|
||||
let srv = listener.incoming()
|
||||
.into_future()
|
||||
.map_err(|(e, _inc)| panic!("accept error: {}", e))
|
||||
|
||||
@@ -14,24 +14,28 @@ use std::sync::mpsc;
|
||||
use futures::{future, stream, Future, Stream};
|
||||
use futures::sync::oneshot;
|
||||
|
||||
use hyper::{Body, Request, Response};
|
||||
use hyper::{Body, Request, Response, Server};
|
||||
use hyper::server::Service;
|
||||
|
||||
macro_rules! bench_server {
|
||||
($b:ident, $header:expr, $body:expr) => ({
|
||||
let _ = pretty_env_logger::try_init();
|
||||
let (_until_tx, until_rx) = oneshot::channel();
|
||||
let (_until_tx, until_rx) = oneshot::channel::<()>();
|
||||
let addr = {
|
||||
let (addr_tx, addr_rx) = mpsc::channel();
|
||||
::std::thread::spawn(move || {
|
||||
let addr = "127.0.0.1:0".parse().unwrap();
|
||||
let srv = hyper::server::Http::new().bind(&addr, || Ok(BenchPayload {
|
||||
header: $header,
|
||||
body: $body,
|
||||
})).unwrap();
|
||||
let addr = srv.local_addr().unwrap();
|
||||
addr_tx.send(addr).unwrap();
|
||||
tokio::run(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
|
||||
let srv = Server::bind(&addr)
|
||||
.serve(|| Ok(BenchPayload {
|
||||
header: $header,
|
||||
body: $body,
|
||||
}));
|
||||
addr_tx.send(srv.local_addr()).unwrap();
|
||||
let fut = srv
|
||||
.map_err(|e| panic!("server error: {}", e))
|
||||
.select(until_rx.then(|_| Ok(())))
|
||||
.then(|_| Ok(()));
|
||||
tokio::run(fut);
|
||||
});
|
||||
|
||||
addr_rx.recv().unwrap()
|
||||
|
||||
@@ -5,15 +5,15 @@ extern crate pretty_env_logger;
|
||||
extern crate tokio;
|
||||
|
||||
use futures::Future;
|
||||
use futures::future::lazy;
|
||||
|
||||
use hyper::{Body, Response};
|
||||
use hyper::server::{Http, const_service, service_fn};
|
||||
use hyper::server::{Server, const_service, service_fn};
|
||||
|
||||
static PHRASE: &'static [u8] = b"Hello World!";
|
||||
|
||||
fn main() {
|
||||
pretty_env_logger::init();
|
||||
|
||||
let addr = ([127, 0, 0, 1], 3000).into();
|
||||
|
||||
let new_service = const_service(service_fn(|_| {
|
||||
@@ -21,13 +21,11 @@ fn main() {
|
||||
Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE)))
|
||||
}));
|
||||
|
||||
tokio::run(lazy(move || {
|
||||
let server = Http::new()
|
||||
.sleep_on_errors(true)
|
||||
.bind(&addr, new_service)
|
||||
.unwrap();
|
||||
let server = Server::bind(&addr)
|
||||
.serve(new_service)
|
||||
.map_err(|e| eprintln!("server error: {}", e));
|
||||
|
||||
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
|
||||
server.run().map_err(|err| eprintln!("Server error {}", err))
|
||||
}));
|
||||
println!("Listening on http://{}", addr);
|
||||
|
||||
tokio::run(server);
|
||||
}
|
||||
|
||||
@@ -4,11 +4,11 @@ extern crate futures;
|
||||
extern crate pretty_env_logger;
|
||||
extern crate tokio;
|
||||
|
||||
use futures::{Future, Stream};
|
||||
use futures::{Future};
|
||||
use futures::future::{FutureResult, lazy};
|
||||
|
||||
use hyper::{Body, Method, Request, Response, StatusCode};
|
||||
use hyper::server::{Http, Service};
|
||||
use hyper::server::{Server, Service};
|
||||
|
||||
static INDEX1: &'static [u8] = b"The 1st service!";
|
||||
static INDEX2: &'static [u8] = b"The 2nd service!";
|
||||
@@ -40,25 +40,23 @@ impl Service for Srv {
|
||||
|
||||
fn main() {
|
||||
pretty_env_logger::init();
|
||||
let addr1 = "127.0.0.1:1337".parse().unwrap();
|
||||
let addr2 = "127.0.0.1:1338".parse().unwrap();
|
||||
|
||||
let addr1 = ([127, 0, 0, 1], 1337).into();
|
||||
let addr2 = ([127, 0, 0, 1], 1338).into();
|
||||
|
||||
tokio::run(lazy(move || {
|
||||
let srv1 = Http::new().serve_addr(&addr1, || Ok(Srv(INDEX1))).unwrap();
|
||||
let srv2 = Http::new().serve_addr(&addr2, || Ok(Srv(INDEX2))).unwrap();
|
||||
let srv1 = Server::bind(&addr1)
|
||||
.serve(|| Ok(Srv(INDEX1)))
|
||||
.map_err(|e| eprintln!("server 1 error: {}", e));
|
||||
|
||||
println!("Listening on http://{}", srv1.incoming_ref().local_addr());
|
||||
println!("Listening on http://{}", srv2.incoming_ref().local_addr());
|
||||
let srv2 = Server::bind(&addr2)
|
||||
.serve(|| Ok(Srv(INDEX2)))
|
||||
.map_err(|e| eprintln!("server 2 error: {}", e));
|
||||
|
||||
tokio::spawn(srv1.for_each(move |conn| {
|
||||
tokio::spawn(conn.map_err(|err| println!("srv1 error: {:?}", err)));
|
||||
Ok(())
|
||||
}).map_err(|_| ()));
|
||||
println!("Listening on http://{} and http://{}", addr1, addr2);
|
||||
|
||||
tokio::spawn(srv2.for_each(move |conn| {
|
||||
tokio::spawn(conn.map_err(|err| println!("srv2 error: {:?}", err)));
|
||||
Ok(())
|
||||
}).map_err(|_| ()));
|
||||
tokio::spawn(srv1);
|
||||
tokio::spawn(srv2);
|
||||
|
||||
Ok(())
|
||||
}));
|
||||
|
||||
@@ -6,10 +6,9 @@ extern crate tokio;
|
||||
extern crate url;
|
||||
|
||||
use futures::{Future, Stream};
|
||||
use futures::future::lazy;
|
||||
|
||||
use hyper::{Body, Method, Request, Response, StatusCode};
|
||||
use hyper::server::{Http, Service};
|
||||
use hyper::server::{Server, Service};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use url::form_urlencoded;
|
||||
@@ -96,11 +95,12 @@ impl Service for ParamExample {
|
||||
|
||||
fn main() {
|
||||
pretty_env_logger::init();
|
||||
let addr = "127.0.0.1:1337".parse().unwrap();
|
||||
|
||||
tokio::run(lazy(move || {
|
||||
let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
|
||||
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
|
||||
server.run().map_err(|err| eprintln!("Server error {}", err))
|
||||
}));
|
||||
let addr = ([127, 0, 0, 1], 1337).into();
|
||||
|
||||
let server = Server::bind(&addr)
|
||||
.serve(|| Ok(ParamExample))
|
||||
.map_err(|e| eprintln!("server error: {}", e));
|
||||
|
||||
tokio::run(server);
|
||||
}
|
||||
|
||||
@@ -5,11 +5,10 @@ extern crate pretty_env_logger;
|
||||
extern crate tokio;
|
||||
|
||||
use futures::{Future/*, Sink*/};
|
||||
use futures::future::lazy;
|
||||
use futures::sync::oneshot;
|
||||
|
||||
use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
|
||||
use hyper::server::{Http, Service};
|
||||
use hyper::server::{Server, Service};
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::{self, copy/*, Read*/};
|
||||
@@ -138,11 +137,14 @@ impl Service for ResponseExamples {
|
||||
|
||||
fn main() {
|
||||
pretty_env_logger::init();
|
||||
|
||||
let addr = "127.0.0.1:1337".parse().unwrap();
|
||||
|
||||
tokio::run(lazy(move || {
|
||||
let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
|
||||
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
|
||||
server.run().map_err(|err| eprintln!("Server error {}", err))
|
||||
}));
|
||||
let server = Server::bind(&addr)
|
||||
.serve(|| Ok(ResponseExamples))
|
||||
.map_err(|e| eprintln!("server error: {}", e));
|
||||
|
||||
println!("Listening on http://{}", addr);
|
||||
|
||||
tokio::run(server);
|
||||
}
|
||||
|
||||
@@ -5,10 +5,10 @@ extern crate pretty_env_logger;
|
||||
extern crate tokio;
|
||||
|
||||
use futures::Future;
|
||||
use futures::future::{FutureResult, lazy};
|
||||
use futures::future::{FutureResult};
|
||||
|
||||
use hyper::{Body, Method, Request, Response, StatusCode};
|
||||
use hyper::server::{Http, Service};
|
||||
use hyper::server::{Server, Service};
|
||||
|
||||
static INDEX: &'static [u8] = b"Try POST /echo";
|
||||
|
||||
@@ -41,11 +41,14 @@ impl Service for Echo {
|
||||
|
||||
fn main() {
|
||||
pretty_env_logger::init();
|
||||
let addr = "127.0.0.1:1337".parse().unwrap();
|
||||
|
||||
tokio::run(lazy(move || {
|
||||
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
|
||||
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
|
||||
server.run().map_err(|err| eprintln!("Server error {}", err))
|
||||
}));
|
||||
let addr = ([127, 0, 0, 1], 1337).into();
|
||||
|
||||
let server = Server::bind(&addr)
|
||||
.serve(|| Ok(Echo))
|
||||
.map_err(|e| eprintln!("server error: {}", e));
|
||||
|
||||
println!("Listening on http://{}", addr);
|
||||
|
||||
tokio::run(server);
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ use futures::future::lazy;
|
||||
|
||||
use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode};
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::server::{Http, Service};
|
||||
use hyper::server::{Server, Service};
|
||||
|
||||
#[allow(unused, deprecated)]
|
||||
use std::ascii::AsciiExt;
|
||||
@@ -75,15 +75,17 @@ impl Service for ResponseExamples {
|
||||
|
||||
fn main() {
|
||||
pretty_env_logger::init();
|
||||
|
||||
let addr = "127.0.0.1:1337".parse().unwrap();
|
||||
|
||||
tokio::run(lazy(move || {
|
||||
let client = Client::new();
|
||||
let serve = Http::new().serve_addr(&addr, move || Ok(ResponseExamples(client.clone()))).unwrap();
|
||||
println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());
|
||||
let server = Server::bind(&addr)
|
||||
.serve(move || Ok(ResponseExamples(client.clone())))
|
||||
.map_err(|e| eprintln!("server error: {}", e));
|
||||
|
||||
serve.map_err(|_| ()).for_each(move |conn| {
|
||||
tokio::spawn(conn.map_err(|err| println!("serve error: {:?}", err)))
|
||||
})
|
||||
println!("Listening on http://{}", addr);
|
||||
|
||||
server
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -171,12 +171,12 @@ impl Error {
|
||||
Error::new(Kind::Io, Some(cause.into()))
|
||||
}
|
||||
|
||||
pub(crate) fn new_listen(err: io::Error) -> Error {
|
||||
Error::new(Kind::Listen, Some(err.into()))
|
||||
pub(crate) fn new_listen<E: Into<Cause>>(cause: E) -> Error {
|
||||
Error::new(Kind::Listen, Some(cause.into()))
|
||||
}
|
||||
|
||||
pub(crate) fn new_accept(err: io::Error) -> Error {
|
||||
Error::new(Kind::Accept, Some(Box::new(err)))
|
||||
pub(crate) fn new_accept<E: Into<Cause>>(cause: E) -> Error {
|
||||
Error::new(Kind::Accept, Some(cause.into()))
|
||||
}
|
||||
|
||||
pub(crate) fn new_connect<E: Into<Cause>>(cause: E) -> Error {
|
||||
|
||||
@@ -10,8 +10,19 @@ use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use proto::{Http1Transaction, MessageHead};
|
||||
|
||||
const INIT_BUFFER_SIZE: usize = 8192;
|
||||
pub const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
|
||||
/// The initial buffer size allocated before trying to read from IO.
|
||||
pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
|
||||
|
||||
/// The default maximum read buffer size. If the buffer gets this big and
|
||||
/// a message is still not complete, a `TooLarge` error is triggered.
|
||||
// Note: if this changes, update server::conn::Http::max_buf_size docs.
|
||||
pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
|
||||
|
||||
/// The maximum number of distinct `Buf`s to hold in a list before requiring
|
||||
/// a flush. Only affects when the buffer strategy is to queue buffers.
|
||||
///
|
||||
/// Note that a flush can happen before reaching the maximum. This simply
|
||||
/// forces a flush if the queue gets this big.
|
||||
const MAX_BUF_LIST_BUFFERS: usize = 16;
|
||||
|
||||
pub struct Buffered<T, B> {
|
||||
|
||||
@@ -5,19 +5,59 @@
|
||||
//! are not handled at this level. This module provides the building blocks to
|
||||
//! customize those things externally.
|
||||
//!
|
||||
//! If don't have need to manage connections yourself, consider using the
|
||||
//! If you don't have need to manage connections yourself, consider using the
|
||||
//! higher-level [Server](super) API.
|
||||
|
||||
use std::fmt;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{Future, Poll};
|
||||
use futures::future::{Either};
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use futures::future::{Either, Executor};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
//TODO: change these tokio:: to sub-crates
|
||||
use tokio::reactor::Handle;
|
||||
|
||||
use common::Exec;
|
||||
use proto;
|
||||
use body::{Body, Payload};
|
||||
use super::{HyperService, Request, Response, Service};
|
||||
use super::{HyperService, NewService, Request, Response, Service};
|
||||
|
||||
pub use super::tcp::AddrIncoming;
|
||||
|
||||
/// A lower-level configuration of the HTTP protocol.
|
||||
///
|
||||
/// This structure is used to configure options for an HTTP server connection.
|
||||
///
|
||||
/// If don't have need to manage connections yourself, consider using the
|
||||
/// higher-level [Server](super) API.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Http {
|
||||
exec: Exec,
|
||||
http2: bool,
|
||||
keep_alive: bool,
|
||||
max_buf_size: Option<usize>,
|
||||
pipeline_flush: bool,
|
||||
}
|
||||
|
||||
/// A stream mapping incoming IOs to new services.
|
||||
///
|
||||
/// Yields `Connection`s that are futures that should be put on a reactor.
|
||||
#[must_use = "streams do nothing unless polled"]
|
||||
#[derive(Debug)]
|
||||
pub struct Serve<I, S> {
|
||||
incoming: I,
|
||||
new_service: S,
|
||||
protocol: Http,
|
||||
}
|
||||
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
#[derive(Debug)]
|
||||
pub(super) struct SpawnAll<I, S> {
|
||||
serve: Serve<I, S>,
|
||||
}
|
||||
|
||||
/// A future binding a connection with a Service.
|
||||
///
|
||||
@@ -63,6 +103,187 @@ pub struct Parts<T> {
|
||||
_inner: (),
|
||||
}
|
||||
|
||||
// ===== impl Http =====
|
||||
|
||||
impl Http {
|
||||
/// Creates a new instance of the HTTP protocol, ready to spawn a server or
|
||||
/// start accepting connections.
|
||||
pub fn new() -> Http {
|
||||
Http {
|
||||
exec: Exec::Default,
|
||||
http2: false,
|
||||
keep_alive: true,
|
||||
max_buf_size: None,
|
||||
pipeline_flush: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets whether HTTP2 is required.
|
||||
///
|
||||
/// Default is false
|
||||
pub fn http2_only(&mut self, val: bool) -> &mut Self {
|
||||
self.http2 = val;
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables or disables HTTP keep-alive.
|
||||
///
|
||||
/// Default is true.
|
||||
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
|
||||
self.keep_alive = val;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the maximum buffer size for the connection.
|
||||
///
|
||||
/// Default is ~400kb.
|
||||
pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
|
||||
self.max_buf_size = Some(max);
|
||||
self
|
||||
}
|
||||
|
||||
/// Aggregates flushes to better support pipelined responses.
|
||||
///
|
||||
/// Experimental, may be have bugs.
|
||||
///
|
||||
/// Default is false.
|
||||
pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
|
||||
self.pipeline_flush = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the executor used to spawn background tasks.
|
||||
///
|
||||
/// Default uses implicit default (like `tokio::spawn`).
|
||||
pub fn executor<E>(&mut self, exec: E) -> &mut Self
|
||||
where
|
||||
E: Executor<Box<Future<Item=(), Error=()> + Send>> + Send + Sync + 'static
|
||||
{
|
||||
self.exec = Exec::Executor(Arc::new(exec));
|
||||
self
|
||||
}
|
||||
|
||||
/// Bind a connection together with a `Service`.
|
||||
///
|
||||
/// This returns a Future that must be polled in order for HTTP to be
|
||||
/// driven on the connection.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # extern crate futures;
|
||||
/// # extern crate hyper;
|
||||
/// # extern crate tokio;
|
||||
/// # extern crate tokio_io;
|
||||
/// # use futures::Future;
|
||||
/// # use hyper::{Body, Request, Response};
|
||||
/// # use hyper::server::Service;
|
||||
/// # use hyper::server::conn::Http;
|
||||
/// # use tokio_io::{AsyncRead, AsyncWrite};
|
||||
/// # use tokio::reactor::Handle;
|
||||
/// # fn run<I, S>(some_io: I, some_service: S)
|
||||
/// # where
|
||||
/// # I: AsyncRead + AsyncWrite + Send + 'static,
|
||||
/// # S: Service<Request=Request<Body>, Response=Response<Body>, Error=hyper::Error> + Send + 'static,
|
||||
/// # S::Future: Send
|
||||
/// # {
|
||||
/// let http = Http::new();
|
||||
/// let conn = http.serve_connection(some_io, some_service);
|
||||
///
|
||||
/// let fut = conn.map_err(|e| {
|
||||
/// eprintln!("server connection error: {}", e);
|
||||
/// });
|
||||
///
|
||||
/// tokio::spawn(fut);
|
||||
/// # }
|
||||
/// # fn main() {}
|
||||
/// ```
|
||||
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S>
|
||||
where
|
||||
S: Service<Request = Request<Body>, Response = Response<Bd>>,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
S::Future: Send + 'static,
|
||||
Bd: Payload,
|
||||
I: AsyncRead + AsyncWrite,
|
||||
{
|
||||
let either = if !self.http2 {
|
||||
let mut conn = proto::Conn::new(io);
|
||||
if !self.keep_alive {
|
||||
conn.disable_keep_alive();
|
||||
}
|
||||
conn.set_flush_pipeline(self.pipeline_flush);
|
||||
if let Some(max) = self.max_buf_size {
|
||||
conn.set_max_buf_size(max);
|
||||
}
|
||||
let sd = proto::h1::dispatch::Server::new(service);
|
||||
Either::A(proto::h1::Dispatcher::new(sd, conn))
|
||||
} else {
|
||||
let h2 = proto::h2::Server::new(io, service, self.exec.clone());
|
||||
Either::B(h2)
|
||||
};
|
||||
|
||||
Connection {
|
||||
conn: either,
|
||||
}
|
||||
}
|
||||
|
||||
/// Bind the provided `addr` with the default `Handle` and return [`Serve`](Serve).
|
||||
///
|
||||
/// This method will bind the `addr` provided with a new TCP listener ready
|
||||
/// to accept connections. Each connection will be processed with the
|
||||
/// `new_service` object provided, creating a new service per
|
||||
/// connection.
|
||||
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
|
||||
where
|
||||
S: NewService<Request=Request<Body>, Response=Response<Bd>>,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
Bd: Payload,
|
||||
{
|
||||
let mut incoming = AddrIncoming::new(addr, None)?;
|
||||
if self.keep_alive {
|
||||
incoming.set_keepalive(Some(Duration::from_secs(90)));
|
||||
}
|
||||
Ok(self.serve_incoming(incoming, new_service))
|
||||
}
|
||||
|
||||
/// Bind the provided `addr` with the `Handle` and return a [`Serve`](Serve)
|
||||
///
|
||||
/// This method will bind the `addr` provided with a new TCP listener ready
|
||||
/// to accept connections. Each connection will be processed with the
|
||||
/// `new_service` object provided, creating a new service per
|
||||
/// connection.
|
||||
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
|
||||
where
|
||||
S: NewService<Request = Request<Body>, Response = Response<Bd>>,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
Bd: Payload,
|
||||
{
|
||||
let mut incoming = AddrIncoming::new(addr, Some(handle))?;
|
||||
if self.keep_alive {
|
||||
incoming.set_keepalive(Some(Duration::from_secs(90)));
|
||||
}
|
||||
Ok(self.serve_incoming(incoming, new_service))
|
||||
}
|
||||
|
||||
/// Bind the provided stream of incoming IO objects with a `NewService`.
|
||||
pub fn serve_incoming<I, S, Bd>(&self, incoming: I, new_service: S) -> Serve<I, S>
|
||||
where
|
||||
I: Stream,
|
||||
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
I::Item: AsyncRead + AsyncWrite,
|
||||
S: NewService<Request = Request<Body>, Response = Response<Bd>>,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
Bd: Payload,
|
||||
{
|
||||
Serve {
|
||||
incoming: incoming,
|
||||
new_service: new_service,
|
||||
protocol: self.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ===== impl Connection =====
|
||||
|
||||
impl<I, B, S> Connection<I, S>
|
||||
@@ -154,3 +375,89 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Serve =====
|
||||
|
||||
impl<I, S> Serve<I, S> {
|
||||
/// Spawn all incoming connections onto the executor in `Http`.
|
||||
pub(super) fn spawn_all(self) -> SpawnAll<I, S> {
|
||||
SpawnAll {
|
||||
serve: self,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a reference to the incoming stream.
|
||||
#[inline]
|
||||
pub fn incoming_ref(&self) -> &I {
|
||||
&self.incoming
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the incoming stream.
|
||||
#[inline]
|
||||
pub fn incoming_mut(&mut self) -> &mut I {
|
||||
&mut self.incoming
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S, B> Stream for Serve<I, S>
|
||||
where
|
||||
I: Stream,
|
||||
I::Item: AsyncRead + AsyncWrite,
|
||||
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
S: NewService<Request=Request<Body>, Response=Response<B>>,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
<S::Instance as Service>::Future: Send + 'static,
|
||||
B: Payload,
|
||||
{
|
||||
type Item = Connection<I::Item, S::Instance>;
|
||||
type Error = ::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) {
|
||||
let service = self.new_service.new_service().map_err(::Error::new_user_new_service)?;
|
||||
Ok(Async::Ready(Some(self.protocol.serve_connection(io, service))))
|
||||
} else {
|
||||
Ok(Async::Ready(None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl SpawnAll =====
|
||||
|
||||
impl<S> SpawnAll<AddrIncoming, S> {
|
||||
pub(super) fn local_addr(&self) -> SocketAddr {
|
||||
self.serve.incoming.local_addr()
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S> SpawnAll<I, S> {
|
||||
pub(super) fn incoming_ref(&self) -> &I {
|
||||
self.serve.incoming_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S, B> Future for SpawnAll<I, S>
|
||||
where
|
||||
I: Stream,
|
||||
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
I::Item: AsyncRead + AsyncWrite + Send + 'static,
|
||||
S: NewService<Request = Request<Body>, Response = Response<B>> + Send + 'static,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
<S as NewService>::Instance: Send,
|
||||
<<S as NewService>::Instance as Service>::Future: Send + 'static,
|
||||
B: Payload,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
loop {
|
||||
if let Some(conn) = try_ready!(self.serve.poll()) {
|
||||
let fut = conn
|
||||
.map_err(|err| debug!("conn error: {}", err));
|
||||
self.serve.protocol.exec.execute(fut);
|
||||
} else {
|
||||
return Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,797 +2,175 @@
|
||||
//!
|
||||
//! A `Server` is created to listen on a port, parse HTTP requests, and hand
|
||||
//! them off to a `Service`.
|
||||
//!
|
||||
//! There are two levels of APIs provide for constructing HTTP servers:
|
||||
//!
|
||||
//! - The higher-level [`Server`](Server).
|
||||
//! - The lower-level [conn](conn) module.
|
||||
|
||||
pub mod conn;
|
||||
mod service;
|
||||
mod tcp;
|
||||
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::{SocketAddr, TcpListener as StdTcpListener};
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::task::{self, Task};
|
||||
use futures::future::{self, Either, Executor};
|
||||
use futures::{Future, Stream, Poll, Async};
|
||||
use futures_timer::Delay;
|
||||
use futures::{Future, Stream, Poll};
|
||||
use http::{Request, Response};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio::spawn;
|
||||
use tokio::reactor::Handle;
|
||||
use tokio::net::TcpListener;
|
||||
pub use tokio_service::{NewService, Service};
|
||||
|
||||
use body::{Body, Payload};
|
||||
use common::Exec;
|
||||
use proto;
|
||||
use self::addr_stream::AddrStream;
|
||||
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
|
||||
// error that `hyper::server::Http` is private...
|
||||
use self::conn::{Http as Http_, SpawnAll};
|
||||
use self::hyper_service::HyperService;
|
||||
use self::tcp::{AddrIncoming};
|
||||
|
||||
pub use self::conn::Connection;
|
||||
pub use self::service::{const_service, service_fn};
|
||||
|
||||
/// A configuration of the HTTP protocol.
|
||||
/// A listening HTTP server.
|
||||
///
|
||||
/// This structure is used to create instances of `Server` or to spawn off tasks
|
||||
/// which handle a connection to an HTTP server. Each instance of `Http` can be
|
||||
/// configured with various protocol-level options such as keepalive.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Http {
|
||||
exec: Exec,
|
||||
http2: bool,
|
||||
keep_alive: bool,
|
||||
max_buf_size: Option<usize>,
|
||||
pipeline: bool,
|
||||
sleep_on_errors: bool,
|
||||
/// `Server` is a `Future` mapping a bound listener with a set of service
|
||||
/// handlers. It is built using the [`Builder`](Builder), and the future
|
||||
/// completes when the server has been shutdown. It should be run by an
|
||||
/// `Executor`.
|
||||
pub struct Server<I, S> {
|
||||
spawn_all: SpawnAll<I, S>,
|
||||
}
|
||||
|
||||
/// An instance of a server created through `Http::bind`.
|
||||
///
|
||||
/// This server is intended as a convenience for creating a TCP listener on an
|
||||
/// address and then serving TCP connections accepted with the service provided.
|
||||
pub struct Server<S> {
|
||||
protocol: Http,
|
||||
new_service: S,
|
||||
handle: Handle,
|
||||
listener: TcpListener,
|
||||
shutdown_timeout: Duration,
|
||||
}
|
||||
|
||||
/// A stream mapping incoming IOs to new services.
|
||||
///
|
||||
/// Yields `Connection`s that are futures that should be put on a reactor.
|
||||
#[must_use = "streams do nothing unless polled"]
|
||||
/// A builder for a [`Server`](Server).
|
||||
#[derive(Debug)]
|
||||
pub struct Serve<I, S> {
|
||||
pub struct Builder<I> {
|
||||
incoming: I,
|
||||
new_service: S,
|
||||
protocol: Http,
|
||||
protocol: Http_,
|
||||
}
|
||||
|
||||
/*
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
#[derive(Debug)]
|
||||
pub struct SpawnAll<I, S, E> {
|
||||
executor: E,
|
||||
serve: Serve<I, S>,
|
||||
}
|
||||
*/
|
||||
|
||||
/// A stream of connections from binding to an address.
|
||||
#[must_use = "streams do nothing unless polled"]
|
||||
pub struct AddrIncoming {
|
||||
addr: SocketAddr,
|
||||
keep_alive_timeout: Option<Duration>,
|
||||
listener: TcpListener,
|
||||
handle: Handle,
|
||||
sleep_on_errors: bool,
|
||||
timeout: Option<Delay>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for AddrIncoming {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("AddrIncoming")
|
||||
.field("addr", &self.addr)
|
||||
.field("keep_alive_timeout", &self.keep_alive_timeout)
|
||||
.field("listener", &self.listener)
|
||||
.field("handle", &self.handle)
|
||||
.field("sleep_on_errors", &self.sleep_on_errors)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Http =====
|
||||
|
||||
impl Http {
|
||||
/// Creates a new instance of the HTTP protocol, ready to spawn a server or
|
||||
/// start accepting connections.
|
||||
pub fn new() -> Http {
|
||||
Http {
|
||||
exec: Exec::Default,
|
||||
http2: false,
|
||||
keep_alive: true,
|
||||
max_buf_size: None,
|
||||
pipeline: false,
|
||||
sleep_on_errors: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets whether HTTP2 is required.
|
||||
///
|
||||
/// Default is false
|
||||
pub fn http2_only(&mut self, val: bool) -> &mut Self {
|
||||
self.http2 = val;
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables or disables HTTP keep-alive.
|
||||
///
|
||||
/// Default is true.
|
||||
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
|
||||
self.keep_alive = val;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the maximum buffer size for the connection.
|
||||
pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
|
||||
self.max_buf_size = Some(max);
|
||||
self
|
||||
}
|
||||
|
||||
/// Aggregates flushes to better support pipelined responses.
|
||||
///
|
||||
/// Experimental, may be have bugs.
|
||||
///
|
||||
/// Default is false.
|
||||
pub fn pipeline(&mut self, enabled: bool) -> &mut Self {
|
||||
self.pipeline = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the executor used to spawn background tasks.
|
||||
///
|
||||
/// Default uses implicit default (like `tokio::spawn`).
|
||||
pub fn executor<E>(&mut self, exec: E) -> &mut Self
|
||||
where
|
||||
E: Executor<Box<Future<Item=(), Error=()> + Send>> + Send + Sync + 'static
|
||||
{
|
||||
self.exec = Exec::Executor(Arc::new(exec));
|
||||
self
|
||||
}
|
||||
|
||||
/// Swallow connection accept errors. Instead of passing up IO errors when
|
||||
/// the server is under heavy load the errors will be ignored. Some
|
||||
/// connection accept errors (like "connection reset") can be ignored, some
|
||||
/// (like "too many files open") may consume 100% CPU and a timout of 10ms
|
||||
/// is used in that case.
|
||||
///
|
||||
/// Default is false.
|
||||
pub fn sleep_on_errors(&mut self, enabled: bool) -> &mut Self {
|
||||
self.sleep_on_errors = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Bind the provided `addr` and return a server ready to handle
|
||||
/// connections.
|
||||
///
|
||||
/// This method will bind the `addr` provided with a new TCP listener ready
|
||||
/// to accept connections. Each connection will be processed with the
|
||||
/// `new_service` object provided as well, creating a new service per
|
||||
/// connection.
|
||||
///
|
||||
/// The returned `Server` contains one method, `run`, which is used to
|
||||
/// actually run the server.
|
||||
pub fn bind<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Server<S>>
|
||||
where
|
||||
S: NewService<Request=Request<Body>, Response=Response<Bd>> + 'static,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
Bd: Payload,
|
||||
{
|
||||
let handle = Handle::current();
|
||||
let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?;
|
||||
let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?;
|
||||
|
||||
Ok(Server {
|
||||
new_service: new_service,
|
||||
handle: handle,
|
||||
listener: listener,
|
||||
protocol: self.clone(),
|
||||
shutdown_timeout: Duration::new(1, 0),
|
||||
})
|
||||
}
|
||||
|
||||
/// Bind the provided `addr` and return a server with the default `Handle`.
|
||||
///
|
||||
/// This is method will bind the `addr` provided with a new TCP listener ready
|
||||
/// to accept connections. Each connection will be processed with the
|
||||
/// `new_service` object provided as well, creating a new service per
|
||||
/// connection.
|
||||
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
|
||||
where
|
||||
S: NewService<Request=Request<Body>, Response=Response<Bd>>,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
Bd: Payload,
|
||||
{
|
||||
let handle = Handle::current();
|
||||
let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?;
|
||||
let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?;
|
||||
let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors).map_err(::Error::new_listen)?;
|
||||
if self.keep_alive {
|
||||
incoming.set_keepalive(Some(Duration::from_secs(90)));
|
||||
}
|
||||
Ok(self.serve_incoming(incoming, new_service))
|
||||
}
|
||||
|
||||
/// Bind the provided `addr` and return a server with a shared `Core`.
|
||||
///
|
||||
/// This method allows the ability to share a `Core` with multiple servers.
|
||||
///
|
||||
/// This is method will bind the `addr` provided with a new TCP listener ready
|
||||
/// to accept connections. Each connection will be processed with the
|
||||
/// `new_service` object provided as well, creating a new service per
|
||||
/// connection.
|
||||
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
|
||||
where
|
||||
S: NewService<Request = Request<Body>, Response = Response<Bd>>,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
Bd: Payload,
|
||||
{
|
||||
let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?;
|
||||
let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?;
|
||||
let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors).map_err(::Error::new_listen)?;
|
||||
|
||||
if self.keep_alive {
|
||||
incoming.set_keepalive(Some(Duration::from_secs(90)));
|
||||
}
|
||||
Ok(self.serve_incoming(incoming, new_service))
|
||||
}
|
||||
|
||||
/// Bind the provided stream of incoming IO objects with a `NewService`.
|
||||
///
|
||||
/// This method allows the ability to share a `Core` with multiple servers.
|
||||
pub fn serve_incoming<I, S, Bd>(&self, incoming: I, new_service: S) -> Serve<I, S>
|
||||
where
|
||||
I: Stream<Error=::std::io::Error>,
|
||||
I::Item: AsyncRead + AsyncWrite,
|
||||
S: NewService<Request = Request<Body>, Response = Response<Bd>>,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
Bd: Payload,
|
||||
{
|
||||
Serve {
|
||||
incoming: incoming,
|
||||
new_service: new_service,
|
||||
protocol: self.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Bind a connection together with a Service.
|
||||
///
|
||||
/// This returns a Future that must be polled in order for HTTP to be
|
||||
/// driven on the connection.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # extern crate futures;
|
||||
/// # extern crate hyper;
|
||||
/// # extern crate tokio;
|
||||
/// # extern crate tokio_io;
|
||||
/// # use futures::Future;
|
||||
/// # use hyper::{Body, Request, Response};
|
||||
/// # use hyper::server::{Http, Service};
|
||||
/// # use tokio_io::{AsyncRead, AsyncWrite};
|
||||
/// # use tokio::reactor::Handle;
|
||||
/// # fn run<I, S>(some_io: I, some_service: S)
|
||||
/// # where
|
||||
/// # I: AsyncRead + AsyncWrite + Send + 'static,
|
||||
/// # S: Service<Request=Request<Body>, Response=Response<Body>, Error=hyper::Error> + Send + 'static,
|
||||
/// # S::Future: Send
|
||||
/// # {
|
||||
/// let http = Http::new();
|
||||
/// let conn = http.serve_connection(some_io, some_service);
|
||||
///
|
||||
/// let fut = conn
|
||||
/// .map_err(|e| eprintln!("server connection error: {}", e));
|
||||
///
|
||||
/// tokio::spawn(fut);
|
||||
/// # }
|
||||
/// # fn main() {}
|
||||
/// ```
|
||||
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S>
|
||||
where
|
||||
S: Service<Request = Request<Body>, Response = Response<Bd>>,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
S::Future: Send + 'static,
|
||||
Bd: Payload,
|
||||
I: AsyncRead + AsyncWrite,
|
||||
{
|
||||
let either = if !self.http2 {
|
||||
let mut conn = proto::Conn::new(io);
|
||||
if !self.keep_alive {
|
||||
conn.disable_keep_alive();
|
||||
}
|
||||
conn.set_flush_pipeline(self.pipeline);
|
||||
if let Some(max) = self.max_buf_size {
|
||||
conn.set_max_buf_size(max);
|
||||
}
|
||||
let sd = proto::h1::dispatch::Server::new(service);
|
||||
Either::A(proto::h1::Dispatcher::new(sd, conn))
|
||||
} else {
|
||||
let h2 = proto::h2::Server::new(io, service, self.exec.clone());
|
||||
Either::B(h2)
|
||||
};
|
||||
|
||||
Connection {
|
||||
conn: either,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ===== impl Server =====
|
||||
|
||||
|
||||
/// TODO: add docs
|
||||
pub struct Run(Box<Future<Item=(), Error=::Error> + Send + 'static>);
|
||||
|
||||
impl fmt::Debug for Run {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("Run").finish()
|
||||
impl<I> Server<I, ()> {
|
||||
/// Starts a [`Builder`](Builder) with the provided incoming stream.
|
||||
pub fn builder(incoming: I) -> Builder<I> {
|
||||
Builder {
|
||||
incoming,
|
||||
protocol: Http_::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Run {
|
||||
type Item = ();
|
||||
type Error = ::Error;
|
||||
impl Server<AddrIncoming, ()> {
|
||||
/// Binds to the provided address, and returns a [`Builder`](Builder).
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This method will panic if binding to the address fails. For a method
|
||||
/// to bind to an address and return a `Result`, see `Server::try_bind`.
|
||||
pub fn bind(addr: &SocketAddr) -> Builder<AddrIncoming> {
|
||||
let incoming = AddrIncoming::new(addr, None)
|
||||
.unwrap_or_else(|e| {
|
||||
panic!("error binding to {}: {}", addr, e);
|
||||
});
|
||||
Server::builder(incoming)
|
||||
}
|
||||
|
||||
fn poll(&mut self) -> Poll<(), ::Error> {
|
||||
self.0.poll()
|
||||
/// Tries to bind to the provided address, and returns a [`Builder`](Builder).
|
||||
pub fn try_bind(addr: &SocketAddr) -> ::Result<Builder<AddrIncoming>> {
|
||||
AddrIncoming::new(addr, None)
|
||||
.map(Server::builder)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Server<AddrIncoming, S> {
|
||||
/// Returns the local address that this server is bound to.
|
||||
pub fn local_addr(&self) -> SocketAddr {
|
||||
self.spawn_all.local_addr()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, B> Server<S>
|
||||
impl<I, S, B> Future for Server<I, S>
|
||||
where
|
||||
I: Stream,
|
||||
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
I::Item: AsyncRead + AsyncWrite + Send + 'static,
|
||||
S: NewService<Request = Request<Body>, Response = Response<B>> + Send + 'static,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
<S as NewService>::Instance: Send,
|
||||
<<S as NewService>::Instance as Service>::Future: Send + 'static,
|
||||
B: Payload + Send + 'static,
|
||||
B::Data: Send,
|
||||
{
|
||||
/// Returns the local address that this server is bound to.
|
||||
pub fn local_addr(&self) -> ::Result<SocketAddr> {
|
||||
//TODO: this shouldn't return an error at all, but should get the
|
||||
//local_addr at construction
|
||||
self.listener.local_addr().map_err(::Error::new_io)
|
||||
}
|
||||
|
||||
/// Configure the amount of time this server will wait for a "graceful
|
||||
/// shutdown".
|
||||
///
|
||||
/// This is the amount of time after the shutdown signal is received the
|
||||
/// server will wait for all pending connections to finish. If the timeout
|
||||
/// elapses then the server will be forcibly shut down.
|
||||
///
|
||||
/// This defaults to 1s.
|
||||
pub fn shutdown_timeout(&mut self, timeout: Duration) -> &mut Self {
|
||||
self.shutdown_timeout = timeout;
|
||||
self
|
||||
}
|
||||
|
||||
/// Execute this server infinitely.
|
||||
///
|
||||
/// This method does not currently return, but it will return an error if
|
||||
/// one occurs.
|
||||
pub fn run(self) -> Run {
|
||||
self.run_until(future::empty())
|
||||
}
|
||||
|
||||
/// Execute this server until the given future, `shutdown_signal`, resolves.
|
||||
///
|
||||
/// This method, like `run` above, is used to execute this HTTP server. The
|
||||
/// difference with `run`, however, is that this method allows for shutdown
|
||||
/// in a graceful fashion. The future provided is interpreted as a signal to
|
||||
/// shut down the server when it resolves.
|
||||
///
|
||||
/// This method will block the current thread executing the HTTP server.
|
||||
/// When the `shutdown_signal` has resolved then the TCP listener will be
|
||||
/// unbound (dropped). The thread will continue to block for a maximum of
|
||||
/// `shutdown_timeout` time waiting for active connections to shut down.
|
||||
/// Once the `shutdown_timeout` elapses or all active connections are
|
||||
/// cleaned out then this method will return.
|
||||
pub fn run_until<F>(self, shutdown_signal: F) -> Run
|
||||
where F: Future<Item = (), Error = ()> + Send + 'static,
|
||||
{
|
||||
let Server { protocol, new_service, handle, listener, shutdown_timeout } = self;
|
||||
|
||||
let mut incoming = match AddrIncoming::new(listener, handle.clone(), protocol.sleep_on_errors) {
|
||||
Ok(incoming) => incoming,
|
||||
Err(err) => return Run(Box::new(future::err(::Error::new_listen(err)))),
|
||||
};
|
||||
|
||||
if protocol.keep_alive {
|
||||
incoming.set_keepalive(Some(Duration::from_secs(90)));
|
||||
}
|
||||
|
||||
// Mini future to track the number of active services
|
||||
let info = Arc::new(Mutex::new(Info {
|
||||
active: 0,
|
||||
blocker: None,
|
||||
}));
|
||||
|
||||
// Future for our server's execution
|
||||
let info_cloned = info.clone();
|
||||
let srv = incoming.for_each(move |socket| {
|
||||
let addr = socket.remote_addr;
|
||||
debug!("accepted new connection ({})", addr);
|
||||
|
||||
let service = new_service.new_service()?;
|
||||
let s = NotifyService {
|
||||
inner: service,
|
||||
info: Arc::downgrade(&info_cloned),
|
||||
};
|
||||
info_cloned.lock().unwrap().active += 1;
|
||||
let fut = protocol.serve_connection(socket, s)
|
||||
.map(|_| ())
|
||||
.map_err(move |err| error!("server connection error: ({}) {}", addr, err));
|
||||
spawn(fut);
|
||||
Ok(())
|
||||
});
|
||||
|
||||
// for now, we don't care if the shutdown signal succeeds or errors
|
||||
// as long as it resolves, we will shutdown.
|
||||
let shutdown_signal = shutdown_signal.then(|_| Ok(()));
|
||||
|
||||
// Main execution of the server. Here we use `select` to wait for either
|
||||
// `incoming` or `f` to resolve. We know that `incoming` will never
|
||||
// resolve with a success (it's infinite) so we're actually just waiting
|
||||
// for an error or for `f`, our shutdown signal.
|
||||
//
|
||||
// When we get a shutdown signal (`Ok`) then we drop the TCP listener to
|
||||
// stop accepting incoming connections.
|
||||
let main_execution = shutdown_signal.select(srv).then(move |result| {
|
||||
match result {
|
||||
Ok(((), _incoming)) => {},
|
||||
Err((e, _other)) => return future::Either::A(future::err(::Error::new_accept(e))),
|
||||
}
|
||||
|
||||
// Ok we've stopped accepting new connections at this point, but we want
|
||||
// to give existing connections a chance to clear themselves out. Wait
|
||||
// at most `shutdown_timeout` time before we just return clearing
|
||||
// everything out.
|
||||
//
|
||||
// Our custom `WaitUntilZero` will resolve once all services constructed
|
||||
// here have been destroyed.
|
||||
let timeout = Delay::new(shutdown_timeout);
|
||||
let wait = WaitUntilZero { info: info.clone() };
|
||||
future::Either::B(wait.select(timeout).then(|result| {
|
||||
match result {
|
||||
Ok(_) => Ok(()),
|
||||
//TODO: error variant should be "timed out waiting for graceful shutdown"
|
||||
Err((e, _)) => Err(::Error::new_io(e))
|
||||
}
|
||||
}))
|
||||
});
|
||||
|
||||
Run(Box::new(main_execution))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: fmt::Debug> fmt::Debug for Server<S>
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("Server")
|
||||
.field("listener", &self.listener)
|
||||
.field("new_service", &self.new_service)
|
||||
.field("protocol", &self.protocol)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Serve =====
|
||||
|
||||
impl<I, S> Serve<I, S> {
|
||||
/*
|
||||
/// Spawn all incoming connections onto the provide executor.
|
||||
pub fn spawn_all<E>(self, executor: E) -> SpawnAll<I, S, E> {
|
||||
SpawnAll {
|
||||
executor: executor,
|
||||
serve: self,
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
/// Get a reference to the incoming stream.
|
||||
#[inline]
|
||||
pub fn incoming_ref(&self) -> &I {
|
||||
&self.incoming
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S, B> Stream for Serve<I, S>
|
||||
where
|
||||
I: Stream<Error=io::Error>,
|
||||
I::Item: AsyncRead + AsyncWrite,
|
||||
S: NewService<Request=Request<Body>, Response=Response<B>>,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
<S::Instance as Service>::Future: Send + 'static,
|
||||
B: Payload,
|
||||
{
|
||||
type Item = Connection<I::Item, S::Instance>;
|
||||
type Error = ::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) {
|
||||
let service = self.new_service.new_service().map_err(::Error::new_user_new_service)?;
|
||||
Ok(Async::Ready(Some(self.protocol.serve_connection(io, service))))
|
||||
} else {
|
||||
Ok(Async::Ready(None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl SpawnAll =====
|
||||
|
||||
/*
|
||||
impl<I, S, E> Future for SpawnAll<I, S, E>
|
||||
where
|
||||
I: Stream<Error=io::Error>,
|
||||
I::Item: AsyncRead + AsyncWrite,
|
||||
S: NewService<Request=Request<Body>, Response=Response<B>, Error=::Error>,
|
||||
B: Stream<Error=::Error>,
|
||||
B::Item: AsRef<[u8]>,
|
||||
//E: Executor<Connection<I::Item, S::Instance>>,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
loop {
|
||||
if let Some(conn) = try_ready!(self.serve.poll()) {
|
||||
let fut = conn
|
||||
.map(|_| ())
|
||||
.map_err(|err| debug!("conn error: {}", err));
|
||||
match self.executor.execute(fut) {
|
||||
Ok(()) => (),
|
||||
Err(err) => match err.kind() {
|
||||
ExecuteErrorKind::NoCapacity => {
|
||||
debug!("SpawnAll::poll; executor no capacity");
|
||||
// continue loop
|
||||
},
|
||||
ExecuteErrorKind::Shutdown | _ => {
|
||||
debug!("SpawnAll::poll; executor shutdown");
|
||||
return Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
self.spawn_all.poll()
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// ===== impl AddrIncoming =====
|
||||
|
||||
impl AddrIncoming {
|
||||
fn new(listener: TcpListener, handle: Handle, sleep_on_errors: bool) -> io::Result<AddrIncoming> {
|
||||
Ok(AddrIncoming {
|
||||
addr: listener.local_addr()?,
|
||||
keep_alive_timeout: None,
|
||||
listener: listener,
|
||||
handle: handle,
|
||||
sleep_on_errors: sleep_on_errors,
|
||||
timeout: None,
|
||||
})
|
||||
impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("Server")
|
||||
.field("listener", &self.spawn_all.incoming_ref())
|
||||
.finish()
|
||||
}
|
||||
|
||||
/// Get the local address bound to this listener.
|
||||
pub fn local_addr(&self) -> SocketAddr {
|
||||
self.addr
|
||||
}
|
||||
|
||||
fn set_keepalive(&mut self, dur: Option<Duration>) {
|
||||
self.keep_alive_timeout = dur;
|
||||
}
|
||||
|
||||
/*
|
||||
fn set_sleep_on_errors(&mut self, val: bool) {
|
||||
self.sleep_on_errors = val;
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
impl Stream for AddrIncoming {
|
||||
// currently unnameable...
|
||||
type Item = AddrStream;
|
||||
type Error = ::std::io::Error;
|
||||
// ===== impl Builder =====
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
// Check if a previous timeout is active that was set by IO errors.
|
||||
if let Some(ref mut to) = self.timeout {
|
||||
match to.poll().expect("timeout never fails") {
|
||||
Async::Ready(_) => {}
|
||||
Async::NotReady => return Ok(Async::NotReady),
|
||||
}
|
||||
impl<I> Builder<I> {
|
||||
/// Start a new builder, wrapping an incoming stream and low-level options.
|
||||
///
|
||||
/// For a more convenient constructor, see [`Server::bind`](Server::bind).
|
||||
pub fn new(incoming: I, protocol: Http_) -> Self {
|
||||
Builder {
|
||||
incoming,
|
||||
protocol,
|
||||
}
|
||||
self.timeout = None;
|
||||
loop {
|
||||
match self.listener.poll_accept() {
|
||||
Ok(Async::Ready((socket, addr))) => {
|
||||
if let Some(dur) = self.keep_alive_timeout {
|
||||
if let Err(e) = socket.set_keepalive(Some(dur)) {
|
||||
trace!("error trying to set TCP keepalive: {}", e);
|
||||
}
|
||||
}
|
||||
return Ok(Async::Ready(Some(AddrStream::new(socket, addr))));
|
||||
},
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(ref e) if self.sleep_on_errors => {
|
||||
// Connection errors can be ignored directly, continue by
|
||||
// accepting the next request.
|
||||
if connection_error(e) {
|
||||
continue;
|
||||
}
|
||||
// Sleep 10ms.
|
||||
let delay = ::std::time::Duration::from_millis(10);
|
||||
debug!("accept error: {}; sleeping {:?}",
|
||||
e, delay);
|
||||
let mut timeout = Delay::new(delay);
|
||||
let result = timeout.poll()
|
||||
.expect("timeout never fails");
|
||||
match result {
|
||||
Async::Ready(()) => continue,
|
||||
Async::NotReady => {
|
||||
self.timeout = Some(timeout);
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets whether HTTP/2 is required.
|
||||
///
|
||||
/// Default is `false`.
|
||||
pub fn http2_only(mut self, val: bool) -> Self {
|
||||
self.protocol.http2_only(val);
|
||||
self
|
||||
}
|
||||
|
||||
/// Consume this `Builder`, creating a [`Server`](Server).
|
||||
pub fn serve<S, B>(self, new_service: S) -> Server<I, S>
|
||||
where
|
||||
I: Stream,
|
||||
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
I::Item: AsyncRead + AsyncWrite + Send + 'static,
|
||||
S: NewService<Request = Request<Body>, Response = Response<B>>,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
<S as NewService>::Instance: Send,
|
||||
<<S as NewService>::Instance as Service>::Future: Send + 'static,
|
||||
B: Payload,
|
||||
{
|
||||
let serve = self.protocol.serve_incoming(self.incoming, new_service);
|
||||
let spawn_all = serve.spawn_all();
|
||||
Server {
|
||||
spawn_all,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This function defines errors that are per-connection. Which basically
|
||||
/// means that if we get this error from `accept()` system call it means
|
||||
/// next connection might be ready to be accepted.
|
||||
///
|
||||
/// All other errors will incur a timeout before next `accept()` is performed.
|
||||
/// The timeout is useful to handle resource exhaustion errors like ENFILE
|
||||
/// and EMFILE. Otherwise, could enter into tight loop.
|
||||
fn connection_error(e: &io::Error) -> bool {
|
||||
e.kind() == io::ErrorKind::ConnectionRefused ||
|
||||
e.kind() == io::ErrorKind::ConnectionAborted ||
|
||||
e.kind() == io::ErrorKind::ConnectionReset
|
||||
}
|
||||
|
||||
mod addr_stream {
|
||||
use std::io::{self, Read, Write};
|
||||
use std::net::SocketAddr;
|
||||
use bytes::{Buf, BufMut};
|
||||
use futures::Poll;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AddrStream {
|
||||
inner: TcpStream,
|
||||
pub(super) remote_addr: SocketAddr,
|
||||
impl Builder<AddrIncoming> {
|
||||
/// Set whether TCP keepalive messages are enabled on accepted connections.
|
||||
///
|
||||
/// If `None` is specified, keepalive is disabled, otherwise the duration
|
||||
/// specified will be the time to remain idle before sending TCP keepalive
|
||||
/// probes.
|
||||
pub fn tcp_keepalive(mut self, keepalive: Option<Duration>) -> Self {
|
||||
self.incoming.set_keepalive(keepalive);
|
||||
self
|
||||
}
|
||||
|
||||
impl AddrStream {
|
||||
pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream {
|
||||
AddrStream {
|
||||
inner: tcp,
|
||||
remote_addr: addr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for AddrStream {
|
||||
#[inline]
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.inner.read(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for AddrStream {
|
||||
#[inline]
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.inner.write(buf)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush(&mut self ) -> io::Result<()> {
|
||||
self.inner.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for AddrStream {
|
||||
#[inline]
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
self.inner.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
||||
self.inner.read_buf(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for AddrStream {
|
||||
#[inline]
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
AsyncWrite::shutdown(&mut self.inner)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
||||
self.inner.write_buf(buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== NotifyService =====
|
||||
|
||||
struct NotifyService<S> {
|
||||
inner: S,
|
||||
info: Weak<Mutex<Info>>,
|
||||
}
|
||||
|
||||
struct WaitUntilZero {
|
||||
info: Arc<Mutex<Info>>,
|
||||
}
|
||||
|
||||
struct Info {
|
||||
active: usize,
|
||||
blocker: Option<Task>,
|
||||
}
|
||||
|
||||
impl<S: Service> Service for NotifyService<S> {
|
||||
type Request = S::Request;
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn call(&self, message: Self::Request) -> Self::Future {
|
||||
self.inner.call(message)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Drop for NotifyService<S> {
|
||||
fn drop(&mut self) {
|
||||
let info = match self.info.upgrade() {
|
||||
Some(info) => info,
|
||||
None => return,
|
||||
};
|
||||
let mut info = info.lock().unwrap();
|
||||
info.active -= 1;
|
||||
if info.active == 0 {
|
||||
if let Some(task) = info.blocker.take() {
|
||||
task.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for WaitUntilZero {
|
||||
type Item = ();
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<(), io::Error> {
|
||||
let mut info = self.info.lock().unwrap();
|
||||
if info.active == 0 {
|
||||
Ok(().into())
|
||||
} else {
|
||||
info.blocker = Some(task::current());
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
/// Set the value of `TCP_NODELAY` option for accepted connections.
|
||||
pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
|
||||
self.incoming.set_nodelay(enabled);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
234
src/server/tcp.rs
Normal file
234
src/server/tcp.rs
Normal file
@@ -0,0 +1,234 @@
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::{SocketAddr, TcpListener as StdTcpListener};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use futures_timer::Delay;
|
||||
//TODO: change to tokio_tcp::net::TcpListener
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::reactor::Handle;
|
||||
|
||||
use self::addr_stream::AddrStream;
|
||||
|
||||
/// A stream of connections from binding to an address.
|
||||
#[must_use = "streams do nothing unless polled"]
|
||||
pub struct AddrIncoming {
|
||||
addr: SocketAddr,
|
||||
listener: TcpListener,
|
||||
sleep_on_errors: bool,
|
||||
tcp_keepalive_timeout: Option<Duration>,
|
||||
tcp_nodelay: bool,
|
||||
timeout: Option<Delay>,
|
||||
}
|
||||
|
||||
impl AddrIncoming {
|
||||
pub(super) fn new(addr: &SocketAddr, handle: Option<&Handle>) -> ::Result<AddrIncoming> {
|
||||
let listener = if let Some(handle) = handle {
|
||||
let std_listener = StdTcpListener::bind(addr)
|
||||
.map_err(::Error::new_listen)?;
|
||||
TcpListener::from_std(std_listener, handle)
|
||||
.map_err(::Error::new_listen)?
|
||||
} else {
|
||||
TcpListener::bind(addr).map_err(::Error::new_listen)?
|
||||
};
|
||||
|
||||
let addr = listener.local_addr().map_err(::Error::new_listen)?;
|
||||
|
||||
Ok(AddrIncoming {
|
||||
addr: addr,
|
||||
listener: listener,
|
||||
sleep_on_errors: true,
|
||||
tcp_keepalive_timeout: None,
|
||||
tcp_nodelay: false,
|
||||
timeout: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the local address bound to this listener.
|
||||
pub fn local_addr(&self) -> SocketAddr {
|
||||
self.addr
|
||||
}
|
||||
|
||||
/// Set whether TCP keepalive messages are enabled on accepted connections.
|
||||
///
|
||||
/// If `None` is specified, keepalive is disabled, otherwise the duration
|
||||
/// specified will be the time to remain idle before sending TCP keepalive
|
||||
/// probes.
|
||||
pub fn set_keepalive(&mut self, keepalive: Option<Duration>) -> &mut Self {
|
||||
self.tcp_keepalive_timeout = keepalive;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the value of `TCP_NODELAY` option for accepted connections.
|
||||
pub fn set_nodelay(&mut self, enabled: bool) -> &mut Self {
|
||||
self.tcp_nodelay = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether to sleep on accept errors.
|
||||
///
|
||||
/// A possible scenario is that the process has hit the max open files
|
||||
/// allowed, and so trying to accept a new connection will fail with
|
||||
/// `EMFILE`. In some cases, it's preferable to just wait for some time, if
|
||||
/// the application will likely close some files (or connections), and try
|
||||
/// to accept the connection again. If this option is `true`, the error
|
||||
/// will be logged at the `error` level, since it is still a big deal,
|
||||
/// and then the listener will sleep for 1 second.
|
||||
///
|
||||
/// In other cases, hitting the max open files should be treat similarly
|
||||
/// to being out-of-memory, and simply error (and shutdown). Setting
|
||||
/// this option to `false` will allow that.
|
||||
///
|
||||
/// Default is `true`.
|
||||
pub fn set_sleep_on_errors(&mut self, val: bool) {
|
||||
self.sleep_on_errors = val;
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for AddrIncoming {
|
||||
// currently unnameable...
|
||||
type Item = AddrStream;
|
||||
type Error = ::std::io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
// Check if a previous timeout is active that was set by IO errors.
|
||||
if let Some(ref mut to) = self.timeout {
|
||||
match to.poll().expect("timeout never fails") {
|
||||
Async::Ready(_) => {}
|
||||
Async::NotReady => return Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
self.timeout = None;
|
||||
loop {
|
||||
match self.listener.poll_accept() {
|
||||
Ok(Async::Ready((socket, addr))) => {
|
||||
if let Some(dur) = self.tcp_keepalive_timeout {
|
||||
if let Err(e) = socket.set_keepalive(Some(dur)) {
|
||||
trace!("error trying to set TCP keepalive: {}", e);
|
||||
}
|
||||
}
|
||||
if let Err(e) = socket.set_nodelay(self.tcp_nodelay) {
|
||||
trace!("error trying to set TCP nodelay: {}", e);
|
||||
}
|
||||
return Ok(Async::Ready(Some(AddrStream::new(socket, addr))));
|
||||
},
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(ref e) if self.sleep_on_errors => {
|
||||
// Connection errors can be ignored directly, continue by
|
||||
// accepting the next request.
|
||||
if is_connection_error(e) {
|
||||
debug!("accepted connection already errored: {}", e);
|
||||
continue;
|
||||
}
|
||||
// Sleep 1s.
|
||||
let delay = Duration::from_secs(1);
|
||||
error!("accept error: {}", e);
|
||||
let mut timeout = Delay::new(delay);
|
||||
let result = timeout.poll()
|
||||
.expect("timeout never fails");
|
||||
match result {
|
||||
Async::Ready(()) => continue,
|
||||
Async::NotReady => {
|
||||
self.timeout = Some(timeout);
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This function defines errors that are per-connection. Which basically
|
||||
/// means that if we get this error from `accept()` system call it means
|
||||
/// next connection might be ready to be accepted.
|
||||
///
|
||||
/// All other errors will incur a timeout before next `accept()` is performed.
|
||||
/// The timeout is useful to handle resource exhaustion errors like ENFILE
|
||||
/// and EMFILE. Otherwise, could enter into tight loop.
|
||||
fn is_connection_error(e: &io::Error) -> bool {
|
||||
e.kind() == io::ErrorKind::ConnectionRefused ||
|
||||
e.kind() == io::ErrorKind::ConnectionAborted ||
|
||||
e.kind() == io::ErrorKind::ConnectionReset
|
||||
}
|
||||
|
||||
impl fmt::Debug for AddrIncoming {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("AddrIncoming")
|
||||
.field("addr", &self.addr)
|
||||
.field("sleep_on_errors", &self.sleep_on_errors)
|
||||
.field("tcp_keepalive_timeout", &self.tcp_keepalive_timeout)
|
||||
.field("tcp_nodelay", &self.tcp_nodelay)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
mod addr_stream {
|
||||
use std::io::{self, Read, Write};
|
||||
use std::net::SocketAddr;
|
||||
use bytes::{Buf, BufMut};
|
||||
use futures::Poll;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AddrStream {
|
||||
inner: TcpStream,
|
||||
pub(super) remote_addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl AddrStream {
|
||||
pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream {
|
||||
AddrStream {
|
||||
inner: tcp,
|
||||
remote_addr: addr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for AddrStream {
|
||||
#[inline]
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.inner.read(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for AddrStream {
|
||||
#[inline]
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.inner.write(buf)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush(&mut self ) -> io::Result<()> {
|
||||
self.inner.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for AddrStream {
|
||||
#[inline]
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
self.inner.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
||||
self.inner.read_buf(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for AddrStream {
|
||||
#[inline]
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
AsyncWrite::shutdown(&mut self.inner)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
||||
self.inner.write_buf(buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
#![deny(warnings)]
|
||||
#[macro_use]
|
||||
mod support;
|
||||
use self::support::*;
|
||||
|
||||
@@ -24,7 +24,6 @@ use futures::future::{self, FutureResult, Either};
|
||||
use futures::sync::oneshot;
|
||||
use futures_timer::Delay;
|
||||
use http::header::{HeaderName, HeaderValue};
|
||||
//use net2::TcpBuilder;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::reactor::Handle;
|
||||
@@ -32,7 +31,8 @@ use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use hyper::server::{Http, Service, NewService, service_fn};
|
||||
use hyper::server::{Service, NewService, service_fn};
|
||||
use hyper::server::conn::Http;
|
||||
|
||||
fn tcp_bind(addr: &SocketAddr, handle: &Handle) -> ::tokio::io::Result<TcpListener> {
|
||||
let std_listener = StdTcpListener::bind(addr).unwrap();
|
||||
@@ -1363,8 +1363,9 @@ fn serve_with_options(options: ServeOptions) -> Serve {
|
||||
let (msg_tx, msg_rx) = mpsc::channel();
|
||||
let (reply_tx, reply_rx) = spmc::channel();
|
||||
let (shutdown_tx, shutdown_rx) = oneshot::channel();
|
||||
let shutdown_rx = shutdown_rx.then(|_| Ok(()));
|
||||
|
||||
let addr = "127.0.0.1:0".parse().unwrap();
|
||||
let addr = ([127, 0, 0, 1], 0).into();
|
||||
|
||||
let keep_alive = !options.keep_alive_disabled;
|
||||
let pipeline = options.pipeline;
|
||||
@@ -1372,22 +1373,40 @@ fn serve_with_options(options: ServeOptions) -> Serve {
|
||||
|
||||
let thread_name = format!("test-server-{:?}", dur);
|
||||
let thread = thread::Builder::new().name(thread_name).spawn(move || {
|
||||
tokio::run(::futures::future::lazy(move || {
|
||||
let srv = Http::new()
|
||||
.keep_alive(keep_alive)
|
||||
.pipeline(pipeline)
|
||||
.bind(&addr, TestService {
|
||||
tx: Arc::new(Mutex::new(msg_tx.clone())),
|
||||
_timeout: dur,
|
||||
reply: reply_rx,
|
||||
}).unwrap();
|
||||
addr_tx.send(srv.local_addr().unwrap()).unwrap();
|
||||
srv.run_until(shutdown_rx.then(|_| Ok(())))
|
||||
.map_err(|err| println!("error {}", err))
|
||||
}))
|
||||
}).unwrap();
|
||||
let serve = Http::new()
|
||||
.keep_alive(keep_alive)
|
||||
.pipeline_flush(pipeline)
|
||||
.serve_addr(&addr, TestService {
|
||||
tx: Arc::new(Mutex::new(msg_tx.clone())),
|
||||
_timeout: dur,
|
||||
reply: reply_rx,
|
||||
})
|
||||
.expect("bind to address");
|
||||
|
||||
let addr = addr_rx.recv().unwrap();
|
||||
addr_tx.send(
|
||||
serve
|
||||
.incoming_ref()
|
||||
.local_addr()
|
||||
).expect("server addr tx");
|
||||
|
||||
// spawn_all() is private for now, so just duplicate it here
|
||||
let spawn_all = serve.for_each(|conn| {
|
||||
tokio::spawn(conn.map_err(|e| {
|
||||
println!("server error: {}", e);
|
||||
}));
|
||||
Ok(())
|
||||
}).map_err(|e| {
|
||||
println!("accept error: {}", e)
|
||||
});
|
||||
|
||||
let fut = spawn_all
|
||||
.select(shutdown_rx)
|
||||
.then(|_| Ok(()));
|
||||
|
||||
tokio::run(fut);
|
||||
}).expect("thread spawn");
|
||||
|
||||
let addr = addr_rx.recv().expect("server addr rx");
|
||||
|
||||
Serve {
|
||||
msg_rx: msg_rx,
|
||||
|
||||
@@ -123,9 +123,11 @@ macro_rules! __internal_req_res_prop {
|
||||
);
|
||||
(headers: $map:tt) => ({
|
||||
#[allow(unused_mut)]
|
||||
{
|
||||
let mut headers = HeaderMap::new();
|
||||
__internal_headers!(headers, $map);
|
||||
headers
|
||||
}
|
||||
});
|
||||
($prop_name:ident: $prop_val:expr) => (
|
||||
From::from($prop_val)
|
||||
@@ -228,7 +230,7 @@ pub fn __run_test(cfg: __TestConfig) {
|
||||
});
|
||||
let new_service = hyper::server::const_service(service);
|
||||
|
||||
let serve = hyper::server::Http::new()
|
||||
let serve = hyper::server::conn::Http::new()
|
||||
.http2_only(cfg.server_version == 2)
|
||||
.executor(rt.executor())
|
||||
.serve_addr_handle(
|
||||
@@ -244,7 +246,7 @@ pub fn __run_test(cfg: __TestConfig) {
|
||||
let (success_tx, success_rx) = oneshot::channel();
|
||||
let expected_connections = cfg.connections;
|
||||
let server = serve
|
||||
.fold(0, move |cnt, conn: hyper::server::Connection<_, _>| {
|
||||
.fold(0, move |cnt, conn| {
|
||||
exe.spawn(conn.map_err(|e| panic!("server connection error: {}", e)));
|
||||
Ok::<_, hyper::Error>(cnt + 1)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user