refactor(server): expose Http that implements ServerProto
The main changes are: * The entry point is how `Http`, the implementation of `ServerProto`. This type has a `new` constructor as well as builder methods to configure it. * A high-level entry point of `Http::bind` was added which returns a `Server`. Binding a protocol to a port requires a socket address (where to bind) as well as the instance of `NewService`. Internally this creates a core and a TCP listener. * The returned `Server` has a few methods to learn about itself, e.g. `local_addr` and `handle`, but mainly has two methods: `run` and `run_until`. * The `Server::run` entry point will execute a server infinitely, never having it exit. * The `Server::run_until` method is intended as a graceful shutdown mechanism. When the provided future resolves the server stops accepting connections immediately and then waits for a fixed period of time for all active connections to get torn down, after which the whole server is torn down anyway. * Finally a `Http::bind_connection` method exists as a low-level entry point to spawning a server connection. This is used by `Server::run` as is intended for external use in other event loops if necessary or otherwise low-level needs. BREAKING CHANGE: `Server` is no longer the pimary entry point. Instead, an `Http` type is created and then either `bind` to receiver a `Server`, or it can be passed to other Tokio things.
This commit is contained in:
committed by
Sean McArthur
parent
39a53fcd33
commit
f45e9c8e4f
@@ -3,13 +3,15 @@
|
|||||||
|
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
extern crate tokio_core;
|
|
||||||
extern crate pretty_env_logger;
|
extern crate pretty_env_logger;
|
||||||
|
|
||||||
extern crate test;
|
extern crate test;
|
||||||
|
extern crate tokio_core;
|
||||||
|
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
use futures::{Future, Stream};
|
use futures::{Future, Stream};
|
||||||
use tokio_core::reactor::Core;
|
use tokio_core::reactor::{Core, Handle};
|
||||||
|
use tokio_core::net::TcpListener;
|
||||||
|
|
||||||
use hyper::client;
|
use hyper::client;
|
||||||
use hyper::header::{ContentLength, ContentType};
|
use hyper::header::{ContentLength, ContentType};
|
||||||
@@ -22,9 +24,7 @@ fn get_one_at_a_time(b: &mut test::Bencher) {
|
|||||||
let _ = pretty_env_logger::init();
|
let _ = pretty_env_logger::init();
|
||||||
let mut core = Core::new().unwrap();
|
let mut core = Core::new().unwrap();
|
||||||
let handle = core.handle();
|
let handle = core.handle();
|
||||||
|
let addr = spawn_hello(&handle);
|
||||||
let addr = hyper::Server::http(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap()
|
|
||||||
.handle(|| Ok(Hello), &handle).unwrap();
|
|
||||||
|
|
||||||
let client = hyper::Client::new(&handle);
|
let client = hyper::Client::new(&handle);
|
||||||
|
|
||||||
@@ -47,9 +47,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
|
|||||||
let _ = pretty_env_logger::init();
|
let _ = pretty_env_logger::init();
|
||||||
let mut core = Core::new().unwrap();
|
let mut core = Core::new().unwrap();
|
||||||
let handle = core.handle();
|
let handle = core.handle();
|
||||||
|
let addr = spawn_hello(&handle);
|
||||||
let addr = hyper::Server::http(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap()
|
|
||||||
.handle(|| Ok(Hello), &handle).unwrap();
|
|
||||||
|
|
||||||
let client = hyper::Client::new(&handle);
|
let client = hyper::Client::new(&handle);
|
||||||
|
|
||||||
@@ -92,3 +90,17 @@ impl Service for Hello {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn spawn_hello(handle: &Handle) -> SocketAddr {
|
||||||
|
let addr = "127.0.0.1:0".parse().unwrap();
|
||||||
|
let listener = TcpListener::bind(&addr, handle).unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
let handle2 = handle.clone();
|
||||||
|
handle.spawn(listener.incoming().for_each(move |(socket, addr)| {
|
||||||
|
let http = hyper::server::Http::new();
|
||||||
|
http.bind_connection(&handle2, socket, addr, Hello);
|
||||||
|
Ok(())
|
||||||
|
}).then(|_| Ok(())));
|
||||||
|
return addr
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ extern crate pretty_env_logger;
|
|||||||
//extern crate num_cpus;
|
//extern crate num_cpus;
|
||||||
|
|
||||||
use hyper::header::{ContentLength, ContentType};
|
use hyper::header::{ContentLength, ContentType};
|
||||||
use hyper::server::{Server, Service, Request, Response};
|
use hyper::server::{Http, Service, Request, Response};
|
||||||
|
|
||||||
static PHRASE: &'static [u8] = b"Hello World!";
|
static PHRASE: &'static [u8] = b"Hello World!";
|
||||||
|
|
||||||
@@ -31,9 +31,7 @@ impl Service for Hello {
|
|||||||
fn main() {
|
fn main() {
|
||||||
pretty_env_logger::init().unwrap();
|
pretty_env_logger::init().unwrap();
|
||||||
let addr = "127.0.0.1:3000".parse().unwrap();
|
let addr = "127.0.0.1:3000".parse().unwrap();
|
||||||
let _server = Server::standalone(|tokio| {
|
let server = Http::new().bind(&addr, || Ok(Hello)).unwrap();
|
||||||
Server::http(&addr, tokio)?
|
println!("Listening on http://{}", server.local_addr().unwrap());
|
||||||
.handle(|| Ok(Hello), tokio)
|
server.run().unwrap();
|
||||||
}).unwrap();
|
|
||||||
println!("Listening on http://{}", addr);
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,8 +7,7 @@ extern crate log;
|
|||||||
|
|
||||||
use hyper::{Get, Post, StatusCode};
|
use hyper::{Get, Post, StatusCode};
|
||||||
use hyper::header::ContentLength;
|
use hyper::header::ContentLength;
|
||||||
use hyper::server::{Server, Service, Request, Response};
|
use hyper::server::{Http, Service, Request, Response};
|
||||||
|
|
||||||
|
|
||||||
static INDEX: &'static [u8] = b"Try POST /echo";
|
static INDEX: &'static [u8] = b"Try POST /echo";
|
||||||
|
|
||||||
@@ -48,10 +47,8 @@ impl Service for Echo {
|
|||||||
fn main() {
|
fn main() {
|
||||||
pretty_env_logger::init().unwrap();
|
pretty_env_logger::init().unwrap();
|
||||||
let addr = "127.0.0.1:1337".parse().unwrap();
|
let addr = "127.0.0.1:1337".parse().unwrap();
|
||||||
let (listening, server) = Server::standalone(|tokio| {
|
|
||||||
Server::http(&addr, tokio)?
|
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
|
||||||
.handle(|| Ok(Echo), tokio)
|
println!("Listening on http://{}", server.local_addr().unwrap());
|
||||||
}).unwrap();
|
server.run().unwrap();
|
||||||
println!("Listening on http://{}", listening);
|
|
||||||
server.run();
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,7 @@
|
|||||||
//! [Server](server/index.html), along with a
|
//! [Server](server/index.html), along with a
|
||||||
//! [typed Headers system](header/index.html).
|
//! [typed Headers system](header/index.html).
|
||||||
|
|
||||||
extern crate futures;
|
#[macro_use] extern crate futures;
|
||||||
extern crate futures_cpupool;
|
extern crate futures_cpupool;
|
||||||
extern crate httparse;
|
extern crate httparse;
|
||||||
#[macro_use] extern crate language_tags;
|
#[macro_use] extern crate language_tags;
|
||||||
|
|||||||
@@ -2,23 +2,26 @@
|
|||||||
//!
|
//!
|
||||||
//! A `Server` is created to listen on a port, parse HTTP requests, and hand
|
//! A `Server` is created to listen on a port, parse HTTP requests, and hand
|
||||||
//! them off to a `Service`.
|
//! them off to a `Service`.
|
||||||
|
|
||||||
|
use std::cell::RefCell;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::net::{SocketAddr, TcpListener as StdTcpListener};
|
use std::net::SocketAddr;
|
||||||
|
use std::rc::{Rc, Weak};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use futures::{Future, Map};
|
use futures::future;
|
||||||
use futures::stream::{Stream};
|
use futures::task::{self, Task};
|
||||||
use futures::sync::oneshot;
|
use futures::{Future, Map, Stream, Poll, Async, Sink, StartSend, AsyncSink};
|
||||||
|
|
||||||
use tokio::io::Io;
|
use tokio::io::Io;
|
||||||
|
use tokio::reactor::{Core, Handle, Timeout};
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use tokio::reactor::{Core, Handle};
|
|
||||||
use tokio_proto::BindServer;
|
use tokio_proto::BindServer;
|
||||||
use tokio_proto::streaming::Message;
|
use tokio_proto::streaming::Message;
|
||||||
use tokio_proto::streaming::pipeline::ServerProto;
|
use tokio_proto::streaming::pipeline::{Transport, Frame, ServerProto};
|
||||||
pub use tokio_service::{NewService, Service};
|
pub use tokio_service::{NewService, Service};
|
||||||
|
|
||||||
pub use self::accept::Accept;
|
|
||||||
pub use self::request::Request;
|
pub use self::request::Request;
|
||||||
pub use self::response::Response;
|
pub use self::response::Response;
|
||||||
|
|
||||||
@@ -27,211 +30,125 @@ use http;
|
|||||||
mod request;
|
mod request;
|
||||||
mod response;
|
mod response;
|
||||||
|
|
||||||
type HttpIncoming = ::tokio::net::Incoming;
|
/// An instance of the HTTP protocol, and implementation of tokio-proto's
|
||||||
|
/// `ServerProto` trait.
|
||||||
/// A Server that can accept incoming network requests.
|
///
|
||||||
#[derive(Debug)]
|
/// This structure is used to create instances of `Server` or to spawn off tasks
|
||||||
pub struct Server<A> {
|
/// which handle a connection to an HTTP server. Each instance of `Http` can be
|
||||||
accepter: A,
|
/// configured with various protocol-level options such as keepalive.
|
||||||
addr: SocketAddr,
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Http {
|
||||||
keep_alive: bool,
|
keep_alive: bool,
|
||||||
//idle_timeout: Option<Duration>,
|
|
||||||
//max_sockets: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A: Accept> Server<A> {
|
/// An instance of a server created through `Http::bind`.
|
||||||
/// Creates a new Server from a Stream of Ios.
|
///
|
||||||
///
|
/// This server is intended as a convenience for creating a TCP listener on an
|
||||||
/// The addr is the socket address the accepter is listening on.
|
/// address and then serving TCP connections accepted with the service provided.
|
||||||
pub fn new(accepter: A, addr: SocketAddr) -> Server<A> {
|
pub struct Server<S> {
|
||||||
Server {
|
protocol: Http,
|
||||||
accepter: accepter,
|
new_service: S,
|
||||||
addr: addr,
|
core: Core,
|
||||||
|
listener: TcpListener,
|
||||||
|
shutdown_timeout: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Http {
|
||||||
|
/// Creates a new instance of the HTTP protocol, ready to spawn a server or
|
||||||
|
/// start accepting connections.
|
||||||
|
pub fn new() -> Http {
|
||||||
|
Http {
|
||||||
keep_alive: true,
|
keep_alive: true,
|
||||||
//idle_timeout: Some(Duration::from_secs(75)),
|
|
||||||
//max_sockets: 4096,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Enables or disables HTTP keep-alive.
|
/// Enables or disables HTTP keep-alive.
|
||||||
///
|
///
|
||||||
/// Default is true.
|
/// Default is true.
|
||||||
pub fn keep_alive(mut self, val: bool) -> Server<A> {
|
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
|
||||||
self.keep_alive = val;
|
self.keep_alive = val;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/// Bind the provided `addr` and return a server ready to handle
|
||||||
/// Sets how long an idle connection will be kept before closing.
|
/// connections.
|
||||||
///
|
///
|
||||||
/// Default is 75 seconds.
|
/// This method will bind the `addr` provided with a new TCP listener ready
|
||||||
pub fn idle_timeout(mut self, val: Option<Duration>) -> Server<A> {
|
/// to accept connections. Each connection will be processed with the
|
||||||
self.idle_timeout = val;
|
/// `new_service` object provided as well, creating a new service per
|
||||||
self
|
/// connection.
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
/*
|
|
||||||
/// Sets the maximum open sockets for this Server.
|
|
||||||
///
|
///
|
||||||
/// Default is 4096, but most servers can handle much more than this.
|
/// The returned `Server` contains one method, `run`, which is used to
|
||||||
pub fn max_sockets(mut self, val: usize) -> Server<A> {
|
/// actually run the server.
|
||||||
self.max_sockets = val;
|
pub fn bind<S>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Server<S>>
|
||||||
self
|
where S: NewService<Request = Request, Response = Response, Error = ::Error> +
|
||||||
}
|
Send + Sync + 'static,
|
||||||
*/
|
{
|
||||||
}
|
|
||||||
|
|
||||||
impl Server<HttpIncoming> {
|
|
||||||
/// Creates a new HTTP server config listening on the provided address.
|
|
||||||
pub fn http(addr: &SocketAddr, handle: &Handle) -> ::Result<Server<HttpIncoming>> {
|
|
||||||
let listener = try!(StdTcpListener::bind(addr));
|
|
||||||
let addr = try!(listener.local_addr());
|
|
||||||
let listener = try!(TcpListener::from_listener(listener, &addr, handle));
|
|
||||||
Ok(Server::new(listener.incoming(), addr))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
impl<S: SslServer> Server<HttpsListener<S>> {
|
|
||||||
/// Creates a new server config that will handle `HttpStream`s over SSL.
|
|
||||||
///
|
|
||||||
/// You can use any SSL implementation, as long as it implements `hyper::net::Ssl`.
|
|
||||||
pub fn https(addr: &SocketAddr, ssl: S) -> ::Result<Server<HttpsListener<S>>> {
|
|
||||||
HttpsListener::new(addr, ssl)
|
|
||||||
.map(Server::new)
|
|
||||||
.map_err(From::from)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
|
||||||
impl<A: Accept> Server<A> {
|
|
||||||
/// Binds to a socket and starts handling connections.
|
|
||||||
pub fn handle<H>(self, factory: H, handle: &Handle) -> ::Result<SocketAddr>
|
|
||||||
where H: NewService<Request=Request, Response=Response, Error=::Error> + 'static {
|
|
||||||
let binder = HttpServer {
|
|
||||||
keep_alive: self.keep_alive,
|
|
||||||
};
|
|
||||||
let inner_handle = handle.clone();
|
|
||||||
handle.spawn(self.accepter.accept().for_each(move |(socket, remote_addr)| {
|
|
||||||
let service = HttpService {
|
|
||||||
inner: try!(factory.new_service()),
|
|
||||||
remote_addr: remote_addr,
|
|
||||||
};
|
|
||||||
binder.bind_server(&inner_handle, socket, service);
|
|
||||||
Ok(())
|
|
||||||
}).map_err(|e| {
|
|
||||||
error!("listener io error: {:?}", e);
|
|
||||||
()
|
|
||||||
}));
|
|
||||||
|
|
||||||
Ok(self.addr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Server<()> {
|
|
||||||
/// Create a server that owns its event loop.
|
|
||||||
///
|
|
||||||
/// The returned `ServerLoop` can be used to run the loop forever in the
|
|
||||||
/// thread. The returned `Listening` can be sent to another thread, and
|
|
||||||
/// used to shutdown the `ServerLoop`.
|
|
||||||
pub fn standalone<F>(closure: F) -> ::Result<(Listening, ServerLoop)>
|
|
||||||
where F: FnOnce(&Handle) -> ::Result<SocketAddr> {
|
|
||||||
let core = try!(Core::new());
|
let core = try!(Core::new());
|
||||||
let handle = core.handle();
|
let handle = core.handle();
|
||||||
let addr = try!(closure(&handle));
|
let listener = try!(TcpListener::bind(addr, &handle));
|
||||||
let (shutdown_tx, shutdown_rx) = oneshot::channel();
|
|
||||||
Ok((
|
|
||||||
Listening {
|
|
||||||
addr: addr,
|
|
||||||
shutdown: shutdown_tx,
|
|
||||||
},
|
|
||||||
ServerLoop {
|
|
||||||
inner: Some((core, shutdown_rx)),
|
|
||||||
}
|
|
||||||
))
|
|
||||||
|
|
||||||
|
Ok(Server {
|
||||||
|
new_service: new_service,
|
||||||
|
core: core,
|
||||||
|
listener: listener,
|
||||||
|
protocol: self.clone(),
|
||||||
|
shutdown_timeout: Duration::new(1, 0),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/// A configured `Server` ready to run.
|
/// Use this `Http` instance to create a new server task which handles the
|
||||||
pub struct ServerLoop {
|
/// connection `io` provided.
|
||||||
inner: Option<(Core, oneshot::Receiver<()>)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for ServerLoop {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
f.pad("ServerLoop")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServerLoop {
|
|
||||||
/// Runs the server forever in this loop.
|
|
||||||
///
|
///
|
||||||
/// This will block the current thread.
|
/// This is the low-level method used to actually spawn handling a TCP
|
||||||
pub fn run(self) {
|
/// connection, typically. The `handle` provided is the event loop on which
|
||||||
// drop will take care of it.
|
/// the server task will be spawned, `io` is the I/O object associated with
|
||||||
trace!("ServerLoop::run()");
|
/// this connection (data that's read/written), `remote_addr` is the remote
|
||||||
|
/// peer address of the HTTP client, and `service` defines how HTTP requests
|
||||||
|
/// will be handled (and mapped to responses).
|
||||||
|
///
|
||||||
|
/// This method is typically not invoked directly but is rather transitively
|
||||||
|
/// used through the `serve` helper method above. This can be useful,
|
||||||
|
/// however, when writing mocks or accepting sockets from a non-TCP
|
||||||
|
/// location.
|
||||||
|
pub fn bind_connection<S, I>(&self,
|
||||||
|
handle: &Handle,
|
||||||
|
io: I,
|
||||||
|
remote_addr: SocketAddr,
|
||||||
|
service: S)
|
||||||
|
where S: Service<Request = Request, Response = Response, Error = ::Error> + 'static,
|
||||||
|
I: Io + 'static,
|
||||||
|
{
|
||||||
|
self.bind_server(handle, io, HttpService {
|
||||||
|
inner: service,
|
||||||
|
remote_addr: remote_addr,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for ServerLoop {
|
#[doc(hidden)]
|
||||||
fn drop(&mut self) {
|
#[allow(missing_debug_implementations)]
|
||||||
self.inner.take().map(|(mut loop_, shutdown)| {
|
pub struct ProtoRequest(http::RequestHead);
|
||||||
debug!("ServerLoop::drop running");
|
#[doc(hidden)]
|
||||||
let _ = loop_.run(shutdown.or_else(|_dropped| ::futures::future::empty::<(), oneshot::Canceled>()));
|
#[allow(missing_debug_implementations)]
|
||||||
debug!("Server closed");
|
pub struct ProtoResponse(ResponseHead);
|
||||||
});
|
#[doc(hidden)]
|
||||||
}
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct ProtoTransport<T>(http::Conn<T, http::ServerTransaction>);
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct ProtoBindTransport<T> {
|
||||||
|
inner: future::FutureResult<http::Conn<T, http::ServerTransaction>, io::Error>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A handle of the running server.
|
impl<T: Io + 'static> ServerProto<T> for Http {
|
||||||
pub struct Listening {
|
type Request = ProtoRequest;
|
||||||
addr: SocketAddr,
|
|
||||||
shutdown: ::futures::sync::oneshot::Sender<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for Listening {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
f.debug_struct("Listening")
|
|
||||||
.field("addr", &self.addr)
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for Listening {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
fmt::Display::fmt(&self.addr, f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Listening {
|
|
||||||
/// The addresses this server is listening on.
|
|
||||||
pub fn addr(&self) -> &SocketAddr {
|
|
||||||
&self.addr
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Stop the server from listening to its socket address.
|
|
||||||
pub fn close(self) {
|
|
||||||
debug!("closing server {}", self);
|
|
||||||
self.shutdown.complete(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct HttpServer {
|
|
||||||
keep_alive: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Io + 'static> ServerProto<T> for HttpServer {
|
|
||||||
type Request = http::RequestHead;
|
|
||||||
type RequestBody = http::Chunk;
|
type RequestBody = http::Chunk;
|
||||||
type Response = ResponseHead;
|
type Response = ProtoResponse;
|
||||||
type ResponseBody = http::Chunk;
|
type ResponseBody = http::Chunk;
|
||||||
type Error = ::Error;
|
type Error = ::Error;
|
||||||
type Transport = http::Conn<T, http::ServerTransaction>;
|
type Transport = ProtoTransport<T>;
|
||||||
type BindTransport = io::Result<http::Conn<T, http::ServerTransaction>>;
|
type BindTransport = ProtoBindTransport<T>;
|
||||||
|
|
||||||
fn bind_transport(&self, io: T) -> Self::BindTransport {
|
fn bind_transport(&self, io: T) -> Self::BindTransport {
|
||||||
let ka = if self.keep_alive {
|
let ka = if self.keep_alive {
|
||||||
@@ -239,7 +156,83 @@ impl<T: Io + 'static> ServerProto<T> for HttpServer {
|
|||||||
} else {
|
} else {
|
||||||
http::KA::Disabled
|
http::KA::Disabled
|
||||||
};
|
};
|
||||||
Ok(http::Conn::new(io, ka))
|
ProtoBindTransport {
|
||||||
|
inner: future::ok(http::Conn::new(io, ka)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Io + 'static> Sink for ProtoTransport<T> {
|
||||||
|
type SinkItem = Frame<ProtoResponse, http::Chunk, ::Error>;
|
||||||
|
type SinkError = io::Error;
|
||||||
|
|
||||||
|
fn start_send(&mut self, item: Self::SinkItem)
|
||||||
|
-> StartSend<Self::SinkItem, io::Error> {
|
||||||
|
let item = match item {
|
||||||
|
Frame::Message { message, body } => {
|
||||||
|
Frame::Message { message: message.0, body: body }
|
||||||
|
}
|
||||||
|
Frame::Body { chunk } => Frame::Body { chunk: chunk },
|
||||||
|
Frame::Error { error } => Frame::Error { error: error },
|
||||||
|
};
|
||||||
|
match try!(self.0.start_send(item)) {
|
||||||
|
AsyncSink::Ready => Ok(AsyncSink::Ready),
|
||||||
|
AsyncSink::NotReady(Frame::Message { message, body }) => {
|
||||||
|
Ok(AsyncSink::NotReady(Frame::Message {
|
||||||
|
message: ProtoResponse(message),
|
||||||
|
body: body,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
AsyncSink::NotReady(Frame::Body { chunk }) => {
|
||||||
|
Ok(AsyncSink::NotReady(Frame::Body { chunk: chunk }))
|
||||||
|
}
|
||||||
|
AsyncSink::NotReady(Frame::Error { error }) => {
|
||||||
|
Ok(AsyncSink::NotReady(Frame::Error { error: error }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_complete(&mut self) -> Poll<(), io::Error> {
|
||||||
|
self.0.poll_complete()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Io + 'static> Stream for ProtoTransport<T> {
|
||||||
|
type Item = Frame<ProtoRequest, http::Chunk, ::Error>;
|
||||||
|
type Error = io::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
|
||||||
|
let item = match try_ready!(self.0.poll()) {
|
||||||
|
Some(item) => item,
|
||||||
|
None => return Ok(None.into()),
|
||||||
|
};
|
||||||
|
let item = match item {
|
||||||
|
Frame::Message { message, body } => {
|
||||||
|
Frame::Message { message: ProtoRequest(message), body: body }
|
||||||
|
}
|
||||||
|
Frame::Body { chunk } => Frame::Body { chunk: chunk },
|
||||||
|
Frame::Error { error } => Frame::Error { error: error },
|
||||||
|
};
|
||||||
|
Ok(Some(item).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Io + 'static> Transport for ProtoTransport<T> {
|
||||||
|
fn tick(&mut self) {
|
||||||
|
self.0.tick()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cancel(&mut self) -> io::Result<()> {
|
||||||
|
self.0.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Io + 'static> Future for ProtoBindTransport<T> {
|
||||||
|
type Item = ProtoTransport<T>;
|
||||||
|
type Error = io::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<ProtoTransport<T>, io::Error> {
|
||||||
|
self.inner.poll().map(|a| a.map(ProtoTransport))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -248,12 +241,12 @@ struct HttpService<T> {
|
|||||||
remote_addr: SocketAddr,
|
remote_addr: SocketAddr,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn map_response_to_message(res: Response) -> Message<ResponseHead, http::TokioBody> {
|
fn map_response_to_message(res: Response) -> Message<ProtoResponse, http::TokioBody> {
|
||||||
let (head, body) = response::split(res);
|
let (head, body) = response::split(res);
|
||||||
if let Some(body) = body {
|
if let Some(body) = body {
|
||||||
Message::WithBody(head, body.into())
|
Message::WithBody(ProtoResponse(head), body.into())
|
||||||
} else {
|
} else {
|
||||||
Message::WithoutBody(head)
|
Message::WithoutBody(ProtoResponse(head))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -262,69 +255,184 @@ type ResponseHead = http::MessageHead<::StatusCode>;
|
|||||||
impl<T> Service for HttpService<T>
|
impl<T> Service for HttpService<T>
|
||||||
where T: Service<Request=Request, Response=Response, Error=::Error>,
|
where T: Service<Request=Request, Response=Response, Error=::Error>,
|
||||||
{
|
{
|
||||||
type Request = Message<http::RequestHead, http::TokioBody>;
|
type Request = Message<ProtoRequest, http::TokioBody>;
|
||||||
type Response = Message<ResponseHead, http::TokioBody>;
|
type Response = Message<ProtoResponse, http::TokioBody>;
|
||||||
type Error = ::Error;
|
type Error = ::Error;
|
||||||
type Future = Map<T::Future, fn(Response) -> Message<ResponseHead, http::TokioBody>>;
|
type Future = Map<T::Future, fn(Response) -> Message<ProtoResponse, http::TokioBody>>;
|
||||||
|
|
||||||
fn call(&self, message: Self::Request) -> Self::Future {
|
fn call(&self, message: Self::Request) -> Self::Future {
|
||||||
let (head, body) = match message {
|
let (head, body) = match message {
|
||||||
Message::WithoutBody(head) => (head, http::Body::empty()),
|
Message::WithoutBody(head) => (head.0, http::Body::empty()),
|
||||||
Message::WithBody(head, body) => (head, body.into()),
|
Message::WithBody(head, body) => (head.0, body.into()),
|
||||||
};
|
};
|
||||||
let req = request::new(self.remote_addr, head, body);
|
let req = request::new(self.remote_addr, head, body);
|
||||||
self.inner.call(req).map(map_response_to_message)
|
self.inner.call(req).map(map_response_to_message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//private so the `Acceptor` type can stay internal
|
impl<S> Server<S>
|
||||||
mod accept {
|
where S: NewService<Request = Request, Response = Response, Error = ::Error>
|
||||||
use std::io;
|
+ Send + Sync + 'static,
|
||||||
use std::net::SocketAddr;
|
{
|
||||||
use futures::{Stream, Poll};
|
/// Returns the local address that this server is bound to.
|
||||||
use tokio::io::Io;
|
pub fn local_addr(&self) -> ::Result<SocketAddr> {
|
||||||
|
Ok(try!(self.listener.local_addr()))
|
||||||
|
}
|
||||||
|
|
||||||
/// An Acceptor is an incoming Stream of Io.
|
/// Returns a handle to the underlying event loop that this server will be
|
||||||
|
/// running on.
|
||||||
|
pub fn handle(&self) -> Handle {
|
||||||
|
self.core.handle()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configure the amount of time this server will wait for a "graceful
|
||||||
|
/// shutdown".
|
||||||
///
|
///
|
||||||
/// This trait is not implemented directly, and only exists to make the
|
/// This is the amount of time after the shutdown signal is received the
|
||||||
/// intent clearer. A `Stream<Item=(Io, SocketAddr), Error=io::Error>`
|
/// server will wait for all pending connections to finish. If the timeout
|
||||||
/// should be implemented instead.
|
/// elapses then the server will be forcibly shut down.
|
||||||
pub trait Accept: Stream<Error=io::Error> {
|
///
|
||||||
#[doc(hidden)]
|
/// This defaults to 1s.
|
||||||
type Output: Io + 'static;
|
pub fn shutdown_timeout(&mut self, timeout: Duration) -> &mut Self {
|
||||||
#[doc(hidden)]
|
self.shutdown_timeout = timeout;
|
||||||
type Stream: Stream<Item=(Self::Output, SocketAddr), Error=io::Error> + 'static;
|
self
|
||||||
|
|
||||||
#[doc(hidden)]
|
|
||||||
fn accept(self) -> Accepter<Self::Stream, Self::Output>
|
|
||||||
where Self: Sized;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(missing_debug_implementations)]
|
/// Execute this server infinitely.
|
||||||
pub struct Accepter<T: Stream<Item=(I, SocketAddr), Error=io::Error> + 'static, I: Io + 'static>(T, ::std::marker::PhantomData<I>);
|
///
|
||||||
|
/// This method does not currently return, but it will return an error if
|
||||||
|
/// one occurs.
|
||||||
|
pub fn run(self) -> ::Result<()> {
|
||||||
|
self.run_until(future::empty())
|
||||||
|
}
|
||||||
|
|
||||||
impl<T, I> Stream for Accepter<T, I>
|
/// Execute this server until the given future, `shutdown_signal`, resolves.
|
||||||
where T: Stream<Item=(I, SocketAddr), Error=io::Error>,
|
///
|
||||||
I: Io + 'static,
|
/// This method, like `run` above, is used to execute this HTTP server. The
|
||||||
|
/// difference with `run`, however, is that this method allows for shutdown
|
||||||
|
/// in a graceful fashion. The future provided is interpreted as a signal to
|
||||||
|
/// shut down the server when it resolves.
|
||||||
|
///
|
||||||
|
/// This method will block the current thread executing the HTTP server.
|
||||||
|
/// When the `shutdown_signal` has resolved then the TCP listener will be
|
||||||
|
/// unbound (dropped). The thread will continue to block for a maximum of
|
||||||
|
/// `shutdown_timeout` time waiting for active connections to shut down.
|
||||||
|
/// Once the `shutdown_timeout` elapses or all active connections are
|
||||||
|
/// cleaned out then this method will return.
|
||||||
|
pub fn run_until<F>(self, shutdown_signal: F) -> ::Result<()>
|
||||||
|
where F: Future<Item = (), Error = ::Error>,
|
||||||
{
|
{
|
||||||
type Item = T::Item;
|
let Server { protocol, new_service, mut core, listener, shutdown_timeout } = self;
|
||||||
type Error = io::Error;
|
let handle = core.handle();
|
||||||
|
|
||||||
#[inline]
|
// Mini future to track the number of active services
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
let info = Rc::new(RefCell::new(Info {
|
||||||
self.0.poll()
|
active: 0,
|
||||||
|
blocker: None,
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Future for our server's execution
|
||||||
|
let srv = listener.incoming().for_each(|(socket, addr)| {
|
||||||
|
let s = NotifyService {
|
||||||
|
inner: try!(new_service.new_service()),
|
||||||
|
info: Rc::downgrade(&info),
|
||||||
|
};
|
||||||
|
info.borrow_mut().active += 1;
|
||||||
|
protocol.bind_connection(&handle, socket, addr, s);
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
// Main execution of the server. Here we use `select` to wait for either
|
||||||
|
// `incoming` or `f` to resolve. We know that `incoming` will never
|
||||||
|
// resolve with a success (it's infinite) so we're actually just waiting
|
||||||
|
// for an error or for `f`, our shutdown signal.
|
||||||
|
//
|
||||||
|
// When we get a shutdown signal (`Ok`) then we drop the TCP listener to
|
||||||
|
// stop accepting incoming connections.
|
||||||
|
match core.run(shutdown_signal.select(srv.map_err(|e| e.into()))) {
|
||||||
|
Ok(((), _incoming)) => {}
|
||||||
|
Err((e, _other)) => return Err(e),
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, I> Accept for T
|
// Ok we've stopped accepting new connections at this point, but we want
|
||||||
where T: Stream<Item=(I, SocketAddr), Error=io::Error> + 'static,
|
// to give existing connections a chance to clear themselves out. Wait
|
||||||
I: Io + 'static,
|
// at most `shutdown_timeout` time before we just return clearing
|
||||||
{
|
// everything out.
|
||||||
type Output = I;
|
//
|
||||||
type Stream = T;
|
// Our custom `WaitUntilZero` will resolve once all services constructed
|
||||||
|
// here have been destroyed.
|
||||||
fn accept(self) -> Accepter<Self, I> {
|
let timeout = try!(Timeout::new(shutdown_timeout, &handle));
|
||||||
Accepter(self, ::std::marker::PhantomData)
|
let wait = WaitUntilZero { info: info.clone() };
|
||||||
|
match core.run(wait.select(timeout)) {
|
||||||
|
Ok(_) => Ok(()),
|
||||||
|
Err((e, _)) => return Err(e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: fmt::Debug> fmt::Debug for Server<S> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
f.debug_struct("Server")
|
||||||
|
.field("core", &"...")
|
||||||
|
.field("listener", &self.listener)
|
||||||
|
.field("new_service", &self.new_service)
|
||||||
|
.field("protocol", &self.protocol)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct NotifyService<S> {
|
||||||
|
inner: S,
|
||||||
|
info: Weak<RefCell<Info>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct WaitUntilZero {
|
||||||
|
info: Rc<RefCell<Info>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Info {
|
||||||
|
active: usize,
|
||||||
|
blocker: Option<Task>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Service> Service for NotifyService<S> {
|
||||||
|
type Request = S::Request;
|
||||||
|
type Response = S::Response;
|
||||||
|
type Error = S::Error;
|
||||||
|
type Future = S::Future;
|
||||||
|
|
||||||
|
fn call(&self, message: Self::Request) -> Self::Future {
|
||||||
|
self.inner.call(message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Drop for NotifyService<S> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let info = match self.info.upgrade() {
|
||||||
|
Some(info) => info,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
let mut info = info.borrow_mut();
|
||||||
|
info.active -= 1;
|
||||||
|
if info.active == 0 {
|
||||||
|
if let Some(task) = info.blocker.take() {
|
||||||
|
task.unpark();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for WaitUntilZero {
|
||||||
|
type Item = ();
|
||||||
|
type Error = io::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<(), io::Error> {
|
||||||
|
let mut info = self.info.borrow_mut();
|
||||||
|
if info.active == 0 {
|
||||||
|
Ok(().into())
|
||||||
|
} else {
|
||||||
|
info.blocker = Some(task::park());
|
||||||
|
Ok(Async::NotReady)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,27 +4,29 @@ extern crate futures;
|
|||||||
extern crate spmc;
|
extern crate spmc;
|
||||||
extern crate pretty_env_logger;
|
extern crate pretty_env_logger;
|
||||||
|
|
||||||
use futures::Future;
|
use futures::{Future, Stream};
|
||||||
use futures::stream::Stream;
|
use futures::sync::oneshot;
|
||||||
|
|
||||||
use std::net::{TcpStream, SocketAddr};
|
use std::net::{TcpStream, SocketAddr};
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use hyper::server::{Server, Request, Response, Service, NewService};
|
use hyper::server::{Http, Request, Response, Service, NewService};
|
||||||
|
|
||||||
struct Serve {
|
struct Serve {
|
||||||
listening: Option<hyper::server::Listening>,
|
addr: SocketAddr,
|
||||||
msg_rx: mpsc::Receiver<Msg>,
|
msg_rx: mpsc::Receiver<Msg>,
|
||||||
reply_tx: spmc::Sender<Reply>,
|
reply_tx: spmc::Sender<Reply>,
|
||||||
spawn_rx: mpsc::Receiver<()>,
|
shutdown_signal: Option<oneshot::Sender<()>>,
|
||||||
|
thread: Option<thread::JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Serve {
|
impl Serve {
|
||||||
fn addr(&self) -> &SocketAddr {
|
fn addr(&self) -> &SocketAddr {
|
||||||
self.listening.as_ref().unwrap().addr()
|
&self.addr
|
||||||
}
|
}
|
||||||
|
|
||||||
fn body(&self) -> Vec<u8> {
|
fn body(&self) -> Vec<u8> {
|
||||||
@@ -66,14 +68,14 @@ impl<'a> ReplyBuilder<'a> {
|
|||||||
|
|
||||||
impl Drop for Serve {
|
impl Drop for Serve {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.listening.take().unwrap().close();
|
drop(self.shutdown_signal.take());
|
||||||
self.spawn_rx.recv().expect("server thread should shutdown cleanly");
|
self.thread.take().unwrap().join().unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct TestService {
|
struct TestService {
|
||||||
tx: mpsc::Sender<Msg>,
|
tx: Arc<Mutex<mpsc::Sender<Msg>>>,
|
||||||
reply: spmc::Receiver<Reply>,
|
reply: spmc::Receiver<Reply>,
|
||||||
_timeout: Option<Duration>,
|
_timeout: Option<Duration>,
|
||||||
}
|
}
|
||||||
@@ -94,7 +96,7 @@ impl NewService for TestService {
|
|||||||
type Request = Request;
|
type Request = Request;
|
||||||
type Response = Response;
|
type Response = Response;
|
||||||
type Error = hyper::Error;
|
type Error = hyper::Error;
|
||||||
|
|
||||||
type Instance = TestService;
|
type Instance = TestService;
|
||||||
|
|
||||||
fn new_service(&self) -> std::io::Result<TestService> {
|
fn new_service(&self) -> std::io::Result<TestService> {
|
||||||
@@ -113,7 +115,7 @@ impl Service for TestService {
|
|||||||
let tx = self.tx.clone();
|
let tx = self.tx.clone();
|
||||||
let replies = self.reply.clone();
|
let replies = self.reply.clone();
|
||||||
req.body().for_each(move |chunk| {
|
req.body().for_each(move |chunk| {
|
||||||
tx.send(Msg::Chunk(chunk.to_vec())).unwrap();
|
tx.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap();
|
||||||
Ok(())
|
Ok(())
|
||||||
}).map(move |_| {
|
}).map(move |_| {
|
||||||
let mut res = Response::new();
|
let mut res = Response::new();
|
||||||
@@ -150,35 +152,32 @@ fn serve() -> Serve {
|
|||||||
fn serve_with_timeout(dur: Option<Duration>) -> Serve {
|
fn serve_with_timeout(dur: Option<Duration>) -> Serve {
|
||||||
let _ = pretty_env_logger::init();
|
let _ = pretty_env_logger::init();
|
||||||
|
|
||||||
let (thread_tx, thread_rx) = mpsc::channel();
|
let (addr_tx, addr_rx) = mpsc::channel();
|
||||||
let (spawn_tx, spawn_rx) = mpsc::channel();
|
|
||||||
let (msg_tx, msg_rx) = mpsc::channel();
|
let (msg_tx, msg_rx) = mpsc::channel();
|
||||||
let (reply_tx, reply_rx) = spmc::channel();
|
let (reply_tx, reply_rx) = spmc::channel();
|
||||||
|
let (shutdown_tx, shutdown_rx) = oneshot::channel();
|
||||||
|
|
||||||
let addr = "127.0.0.1:0".parse().unwrap();
|
let addr = "127.0.0.1:0".parse().unwrap();
|
||||||
|
|
||||||
let thread_name = format!("test-server-{:?}", dur);
|
let thread_name = format!("test-server-{:?}", dur);
|
||||||
thread::Builder::new().name(thread_name).spawn(move || {
|
let thread = thread::Builder::new().name(thread_name).spawn(move || {
|
||||||
let (listening, server) = Server::standalone(move |tokio| {
|
let srv = Http::new().bind(&addr, TestService {
|
||||||
Server::http(&addr, tokio).unwrap()
|
tx: Arc::new(Mutex::new(msg_tx.clone())),
|
||||||
.handle(TestService {
|
_timeout: dur,
|
||||||
tx: msg_tx.clone(),
|
reply: reply_rx,
|
||||||
_timeout: dur,
|
|
||||||
reply: reply_rx,
|
|
||||||
}, tokio)
|
|
||||||
}).unwrap();
|
}).unwrap();
|
||||||
thread_tx.send(listening).unwrap();
|
addr_tx.send(srv.local_addr().unwrap()).unwrap();
|
||||||
server.run();
|
srv.run_until(shutdown_rx.then(|_| Ok(()))).unwrap();
|
||||||
spawn_tx.send(()).unwrap();
|
|
||||||
}).unwrap();
|
}).unwrap();
|
||||||
|
|
||||||
let listening = thread_rx.recv().unwrap();
|
let addr = addr_rx.recv().unwrap();
|
||||||
|
|
||||||
Serve {
|
Serve {
|
||||||
listening: Some(listening),
|
|
||||||
msg_rx: msg_rx,
|
msg_rx: msg_rx,
|
||||||
reply_tx: reply_tx,
|
reply_tx: reply_tx,
|
||||||
spawn_rx: spawn_rx,
|
addr: addr,
|
||||||
|
shutdown_signal: Some(shutdown_tx),
|
||||||
|
thread: Some(thread),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user