feat(server): allow creating Server with shared Handle

1. impl Future for Server [WIP]
2. add method bind_handle to Http
3. add an example to use shared Handle in multiple server
This commit is contained in:
Kam Y. Tse
2017-10-06 12:09:00 +08:00
committed by Sean McArthur
parent 7b2a2050b7
commit 0844dede19
2 changed files with 261 additions and 7 deletions

82
examples/multi_server.rs Normal file
View File

@@ -0,0 +1,82 @@
#![deny(warnings)]
extern crate hyper;
extern crate futures;
extern crate tokio_core;
extern crate pretty_env_logger;
use futures::future::FutureResult;
use hyper::{Get, StatusCode};
use tokio_core::reactor::Core;
use hyper::header::ContentLength;
use hyper::server::{Http, Service, Request, Response};
static INDEX1: &'static [u8] = b"The 1st service!";
static INDEX2: &'static [u8] = b"The 2nd service!";
struct Service1;
struct Service2;
impl Service for Service1 {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = FutureResult<Response, hyper::Error>;
fn call(&self, req: Request) -> Self::Future {
futures::future::ok(match (req.method(), req.path()) {
(&Get, "/") => {
Response::new()
.with_header(ContentLength(INDEX1.len() as u64))
.with_body(INDEX1)
},
_ => {
Response::new()
.with_status(StatusCode::NotFound)
}
})
}
}
impl Service for Service2 {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = FutureResult<Response, hyper::Error>;
fn call(&self, req: Request) -> Self::Future {
futures::future::ok(match (req.method(), req.path()) {
(&Get, "/") => {
Response::new()
.with_header(ContentLength(INDEX2.len() as u64))
.with_body(INDEX2)
},
_ => {
Response::new()
.with_status(StatusCode::NotFound)
}
})
}
}
fn main() {
pretty_env_logger::init().unwrap();
let addr1 = "127.0.0.1:1337".parse().unwrap();
let addr2 = "127.0.0.1:1338".parse().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let srv1 = Http::new().bind_handle(&addr1,|| Ok(Service1), &handle).unwrap();
let srv2 = Http::new().bind_handle(&addr2,|| Ok(Service2), &handle).unwrap();
println!("Listening on http://{}", srv1.local_addr().unwrap());
println!("Listening on http://{}", srv2.local_addr().unwrap());
handle.spawn(srv1.shutdown_signal(futures::future::empty::<(), ()>()));
handle.spawn(srv2.shutdown_signal(futures::future::empty::<(), ()>()));
core.run(futures::future::empty::<(), ()>()).unwrap();
}

View File

@@ -16,10 +16,9 @@ use std::net::SocketAddr;
use std::rc::{Rc, Weak};
use std::time::Duration;
use futures::future;
use futures::task::{self, Task};
use futures::future::{self, Select, Map};
use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
use futures::future::Map;
#[cfg(feature = "compat")]
use http;
@@ -41,6 +40,26 @@ use proto::Body;
pub use proto::response::Response;
pub use proto::request::Request;
// The `Server` can be created use its own `Core`, or an shared `Handle`.
enum Reactor {
// Own its `Core`
Core(Core),
// Share `Handle` with others
Handle(Handle),
}
impl Reactor {
/// Returns a handle to the underlying event loop that this server will be
/// running on.
#[inline]
pub fn handle(&self) -> Handle {
match *self {
Reactor::Core(ref core) => core.handle(),
Reactor::Handle(ref handle) => handle.clone(),
}
}
}
/// An instance of the HTTP protocol, and implementation of tokio-proto's
/// `ServerProto` trait.
///
@@ -63,12 +82,23 @@ where B: Stream<Error=::Error>,
{
protocol: Http<B::Item>,
new_service: S,
core: Core,
reactor: Reactor,
listener: TcpListener,
shutdown_timeout: Duration,
no_proto: bool,
}
/// The Future of an Server.
pub struct ServerFuture<F, S, B>
where B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
server: Server<S, B>,
info: Rc<RefCell<Info>>,
shutdown_signal: F,
shutdown: Option<Select<WaitUntilZero, Timeout>>,
}
impl<B: AsRef<[u8]> + 'static> Http<B> {
/// Creates a new instance of the HTTP protocol, ready to spawn a server or
/// start accepting connections.
@@ -118,7 +148,30 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
Ok(Server {
new_service: new_service,
core: core,
reactor: Reactor::Core(core),
listener: listener,
protocol: self.clone(),
shutdown_timeout: Duration::new(1, 0),
})
}
/// This method allows the ability to share a `Core` with multiple servers.
///
/// Bind the provided `addr` and return a server with a shared `Core`.
///
/// This is method will bind the `addr` provided with a new TCP listener ready
/// to accept connections. Each connection will be processed with the
/// `new_service` object provided as well, creating a new service per
/// connection.
pub fn bind_handle<S, Bd>(&self, addr: &SocketAddr, new_service: S, handle: &Handle) -> ::Result<Server<S, Bd>>
where S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error> + 'static,
Bd: Stream<Item=B, Error=::Error>,
{
let listener = TcpListener::bind(addr, &handle)?;
Ok(Server {
new_service: new_service,
reactor: Reactor::Handle(handle.clone()),
listener: listener,
protocol: self.clone(),
shutdown_timeout: Duration::new(1, 0),
@@ -544,7 +597,7 @@ impl<S, B> Server<S, B>
/// Returns a handle to the underlying event loop that this server will be
/// running on.
pub fn handle(&self) -> Handle {
self.core.handle()
self.reactor.handle()
}
/// Configure the amount of time this server will wait for a "graceful
@@ -566,6 +619,21 @@ impl<S, B> Server<S, B>
self
}
/// Configure the `shutdown_signal`.
pub fn shutdown_signal<F>(self, signal: F) -> ServerFuture<F, S, B>
where F: Future<Item = (), Error = ()>
{
ServerFuture {
server: self,
info: Rc::new(RefCell::new(Info {
active: 0,
blocker: None,
})),
shutdown_signal: signal,
shutdown: None,
}
}
/// Execute this server infinitely.
///
/// This method does not currently return, but it will return an error if
@@ -590,7 +658,13 @@ impl<S, B> Server<S, B>
pub fn run_until<F>(self, shutdown_signal: F) -> ::Result<()>
where F: Future<Item = (), Error = ()>,
{
let Server { protocol, new_service, mut core, listener, shutdown_timeout, no_proto } = self;
let Server { protocol, new_service, reactor, listener, shutdown_timeout, no_proto } = self;
let mut core = match reactor {
Reactor::Core(core) => core,
_ => panic!("Server does not own its core, use `Handle::spawn()` to run the service!"),
};
let handle = core.handle();
// Mini future to track the number of active services
@@ -649,12 +723,96 @@ impl<S, B> Server<S, B>
}
}
impl<S, B> Future for Server<S, B>
where S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Reactor::Core(_) = self.reactor {
panic!("Server owns its core, use `Server::run()` to run the service!")
}
loop {
match self.listener.accept() {
Ok((socket, addr)) => {
// TODO: use the NotifyService
match self.new_service.new_service() {
Ok(srv) => self.protocol.bind_connection(&self.handle(),
socket,
addr,
srv),
Err(e) => debug!("internal error: {:?}", e),
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
Err(e) => debug!("internal error: {:?}", e),
}
}
}
}
impl<F, S, B> Future for ServerFuture<F, S, B>
where F: Future<Item = (), Error = ()>,
S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
if let Some(ref mut shutdown) = self.shutdown {
match shutdown.poll() {
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err((e, _)) => debug!("internal error: {:?}", e),
}
} else if let Ok(Async::Ready(())) = self.shutdown_signal.poll() {
match Timeout::new(self.server.shutdown_timeout, &self.server.handle()) {
Ok(timeout) => {
let wait = WaitUntilZero { info: self.info.clone() };
self.shutdown = Some(wait.select(timeout))
},
Err(e) => debug!("internal error: {:?}", e),
}
} else {
match self.server.listener.accept() {
Ok((socket, addr)) => {
match self.server.new_service.new_service() {
Ok(inner_srv) => {
let srv = NotifyService {
inner: inner_srv,
info: Rc::downgrade(&self.info),
};
self.info.borrow_mut().active += 1;
self.server.protocol.bind_connection(&self.server.handle(),
socket,
addr,
srv)
},
Err(e) => debug!("internal error: {:?}", e),
}
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
Err(e) => debug!("internal error: {:?}", e),
}
}
}
}
}
impl<S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for Server<S, B>
where B::Item: AsRef<[u8]>
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Server")
.field("core", &"...")
.field("reactor", &"...")
.field("listener", &self.listener)
.field("new_service", &self.new_service)
.field("protocol", &self.protocol)
@@ -662,6 +820,20 @@ where B::Item: AsRef<[u8]>
}
}
impl <F, S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for ServerFuture<F, S, B>
where B::Item: AsRef<[u8]>,
F: Future<Item = (), Error = ()>
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ServerFuture")
.field("server", &self.server)
.field("info", &"...")
.field("shutdown_signal", &"...")
.field("shutdown", &"...")
.finish()
}
}
struct NotifyService<S> {
inner: S,
info: Weak<RefCell<Info>>,