feat(server): add server::Serve that can use a shared Handle
- Adds `Http::serve_addr_handle` which will bind to an address with a provided `Handle`, and return a `Serve`. - Adds `server::Serve` which is a `Stream` of incoming `Connection`s being bound by a `NewService`. - Renames `Http::no_proto` to `Http::serve_connection`.
This commit is contained in:
@@ -4,6 +4,7 @@ extern crate futures;
|
||||
extern crate tokio_core;
|
||||
extern crate pretty_env_logger;
|
||||
|
||||
use futures::{Future, Stream};
|
||||
use futures::future::FutureResult;
|
||||
|
||||
use hyper::{Get, StatusCode};
|
||||
@@ -14,10 +15,9 @@ use hyper::server::{Http, Service, Request, Response};
|
||||
static INDEX1: &'static [u8] = b"The 1st service!";
|
||||
static INDEX2: &'static [u8] = b"The 2nd service!";
|
||||
|
||||
struct Service1;
|
||||
struct Service2;
|
||||
struct Srv(&'static [u8]);
|
||||
|
||||
impl Service for Service1 {
|
||||
impl Service for Srv {
|
||||
type Request = Request;
|
||||
type Response = Response;
|
||||
type Error = hyper::Error;
|
||||
@@ -27,30 +27,8 @@ impl Service for Service1 {
|
||||
futures::future::ok(match (req.method(), req.path()) {
|
||||
(&Get, "/") => {
|
||||
Response::new()
|
||||
.with_header(ContentLength(INDEX1.len() as u64))
|
||||
.with_body(INDEX1)
|
||||
},
|
||||
_ => {
|
||||
Response::new()
|
||||
.with_status(StatusCode::NotFound)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl Service for Service2 {
|
||||
type Request = Request;
|
||||
type Response = Response;
|
||||
type Error = hyper::Error;
|
||||
type Future = FutureResult<Response, hyper::Error>;
|
||||
|
||||
fn call(&self, req: Request) -> Self::Future {
|
||||
futures::future::ok(match (req.method(), req.path()) {
|
||||
(&Get, "/") => {
|
||||
Response::new()
|
||||
.with_header(ContentLength(INDEX2.len() as u64))
|
||||
.with_body(INDEX2)
|
||||
.with_header(ContentLength(self.0.len() as u64))
|
||||
.with_body(self.0)
|
||||
},
|
||||
_ => {
|
||||
Response::new()
|
||||
@@ -70,13 +48,23 @@ fn main() {
|
||||
let mut core = Core::new().unwrap();
|
||||
let handle = core.handle();
|
||||
|
||||
let srv1 = Http::new().bind_handle(&addr1,|| Ok(Service1), &handle).unwrap();
|
||||
let srv2 = Http::new().bind_handle(&addr2,|| Ok(Service2), &handle).unwrap();
|
||||
let srv1 = Http::new().serve_addr_handle(&addr1, &handle, || Ok(Srv(INDEX1))).unwrap();
|
||||
let srv2 = Http::new().serve_addr_handle(&addr2, &handle, || Ok(Srv(INDEX2))).unwrap();
|
||||
|
||||
println!("Listening on http://{}", srv1.local_addr().unwrap());
|
||||
println!("Listening on http://{}", srv2.local_addr().unwrap());
|
||||
println!("Listening on http://{}", srv1.incoming_ref().local_addr());
|
||||
println!("Listening on http://{}", srv2.incoming_ref().local_addr());
|
||||
|
||||
let handle1 = handle.clone();
|
||||
handle.spawn(srv1.for_each(move |conn| {
|
||||
handle1.spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err)));
|
||||
Ok(())
|
||||
}).map_err(|_| ()));
|
||||
|
||||
let handle2 = handle.clone();
|
||||
handle.spawn(srv2.for_each(move |conn| {
|
||||
handle2.spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err)));
|
||||
Ok(())
|
||||
}).map_err(|_| ()));
|
||||
|
||||
handle.spawn(srv1.shutdown_signal(futures::future::empty::<(), ()>()));
|
||||
handle.spawn(srv2.shutdown_signal(futures::future::empty::<(), ()>()));
|
||||
core.run(futures::future::empty::<(), ()>()).unwrap();
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::rc::{Rc, Weak};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::task::{self, Task};
|
||||
use futures::future::{self, Select, Map};
|
||||
use futures::future::{self, Map};
|
||||
use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
|
||||
|
||||
#[cfg(feature = "compat")]
|
||||
@@ -25,7 +25,7 @@ use http;
|
||||
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio::reactor::{Core, Handle, Timeout};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio_proto::BindServer;
|
||||
use tokio_proto::streaming::Message;
|
||||
use tokio_proto::streaming::pipeline::{Transport, Frame, ServerProto};
|
||||
@@ -36,30 +36,11 @@ use proto::response;
|
||||
use proto::request;
|
||||
#[cfg(feature = "compat")]
|
||||
use proto::Body;
|
||||
use self::hyper_service::HyperService;
|
||||
|
||||
pub use proto::response::Response;
|
||||
pub use proto::request::Request;
|
||||
|
||||
// The `Server` can be created use its own `Core`, or an shared `Handle`.
|
||||
enum Reactor {
|
||||
// Own its `Core`
|
||||
Core(Core),
|
||||
// Share `Handle` with others
|
||||
Handle(Handle),
|
||||
}
|
||||
|
||||
impl Reactor {
|
||||
/// Returns a handle to the underlying event loop that this server will be
|
||||
/// running on.
|
||||
#[inline]
|
||||
pub fn handle(&self) -> Handle {
|
||||
match *self {
|
||||
Reactor::Core(ref core) => core.handle(),
|
||||
Reactor::Handle(ref handle) => handle.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An instance of the HTTP protocol, and implementation of tokio-proto's
|
||||
/// `ServerProto` trait.
|
||||
///
|
||||
@@ -82,23 +63,62 @@ where B: Stream<Error=::Error>,
|
||||
{
|
||||
protocol: Http<B::Item>,
|
||||
new_service: S,
|
||||
reactor: Reactor,
|
||||
reactor: Core,
|
||||
listener: TcpListener,
|
||||
shutdown_timeout: Duration,
|
||||
no_proto: bool,
|
||||
}
|
||||
|
||||
/// The Future of an Server.
|
||||
pub struct ServerFuture<F, S, B>
|
||||
where B: Stream<Error=::Error>,
|
||||
B::Item: AsRef<[u8]>,
|
||||
{
|
||||
server: Server<S, B>,
|
||||
info: Rc<RefCell<Info>>,
|
||||
shutdown_signal: F,
|
||||
shutdown: Option<Select<WaitUntilZero, Timeout>>,
|
||||
/// A stream mapping incoming IOs to new services.
|
||||
///
|
||||
/// Yields `Connection`s that are futures that should be put on a reactor.
|
||||
#[must_use = "streams do nothing unless polled"]
|
||||
#[derive(Debug)]
|
||||
pub struct Serve<I, S> {
|
||||
incoming: I,
|
||||
new_service: S,
|
||||
protocol: Http,
|
||||
}
|
||||
|
||||
/*
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
#[derive(Debug)]
|
||||
pub struct SpawnAll<I, S, E> {
|
||||
executor: E,
|
||||
serve: Serve<I, S>,
|
||||
}
|
||||
*/
|
||||
|
||||
/// A stream of connections from binding to an address.
|
||||
#[must_use = "streams do nothing unless polled"]
|
||||
#[derive(Debug)]
|
||||
pub struct AddrStream {
|
||||
addr: SocketAddr,
|
||||
listener: TcpListener,
|
||||
}
|
||||
|
||||
/// A future binding a connection with a Service.
|
||||
///
|
||||
/// Polling this future will drive HTTP forward.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct Connection<I, S>
|
||||
where
|
||||
S: HyperService,
|
||||
S::ResponseBody: Stream<Error=::Error>,
|
||||
<S::ResponseBody as Stream>::Item: AsRef<[u8]>,
|
||||
{
|
||||
conn: proto::dispatch::Dispatcher<
|
||||
proto::dispatch::Server<S>,
|
||||
S::ResponseBody,
|
||||
I,
|
||||
<S::ResponseBody as Stream>::Item,
|
||||
proto::ServerTransaction,
|
||||
proto::KA,
|
||||
>,
|
||||
}
|
||||
|
||||
// ===== impl Http =====
|
||||
|
||||
impl<B: AsRef<[u8]> + 'static> Http<B> {
|
||||
/// Creates a new instance of the HTTP protocol, ready to spawn a server or
|
||||
/// start accepting connections.
|
||||
@@ -148,30 +168,7 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
|
||||
|
||||
Ok(Server {
|
||||
new_service: new_service,
|
||||
reactor: Reactor::Core(core),
|
||||
listener: listener,
|
||||
protocol: self.clone(),
|
||||
shutdown_timeout: Duration::new(1, 0),
|
||||
})
|
||||
}
|
||||
|
||||
/// This method allows the ability to share a `Core` with multiple servers.
|
||||
///
|
||||
/// Bind the provided `addr` and return a server with a shared `Core`.
|
||||
///
|
||||
/// This is method will bind the `addr` provided with a new TCP listener ready
|
||||
/// to accept connections. Each connection will be processed with the
|
||||
/// `new_service` object provided as well, creating a new service per
|
||||
/// connection.
|
||||
pub fn bind_handle<S, Bd>(&self, addr: &SocketAddr, new_service: S, handle: &Handle) -> ::Result<Server<S, Bd>>
|
||||
where S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error> + 'static,
|
||||
Bd: Stream<Item=B, Error=::Error>,
|
||||
{
|
||||
let listener = TcpListener::bind(addr, &handle)?;
|
||||
|
||||
Ok(Server {
|
||||
new_service: new_service,
|
||||
reactor: Reactor::Handle(handle.clone()),
|
||||
reactor: core,
|
||||
listener: listener,
|
||||
protocol: self.clone(),
|
||||
shutdown_timeout: Duration::new(1, 0),
|
||||
@@ -179,6 +176,7 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
/// Bind a `NewService` using types from the `http` crate.
|
||||
///
|
||||
/// See `Http::bind`.
|
||||
@@ -220,29 +218,6 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Bind a connection together with a Service.
|
||||
///
|
||||
/// This returns a Future that must be polled in order for HTTP to be
|
||||
/// driven on the connection.
|
||||
///
|
||||
/// This additionally skips the tokio-proto infrastructure internally.
|
||||
pub fn no_proto<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S>
|
||||
where S: Service<Request = Request, Response = Response<Bd>, Error = ::Error> + 'static,
|
||||
Bd: Stream<Item=B, Error=::Error> + 'static,
|
||||
I: AsyncRead + AsyncWrite + 'static,
|
||||
|
||||
{
|
||||
let ka = if self.keep_alive {
|
||||
proto::KA::Busy
|
||||
} else {
|
||||
proto::KA::Disabled
|
||||
};
|
||||
let mut conn = proto::Conn::new(io, ka);
|
||||
conn.set_flush_pipeline(self.pipeline);
|
||||
Connection {
|
||||
conn: proto::dispatch::Dispatcher::new(proto::dispatch::Server::new(service), conn),
|
||||
}
|
||||
}
|
||||
|
||||
/// Bind a `Service` using types from the `http` crate.
|
||||
///
|
||||
@@ -262,124 +237,71 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
|
||||
remote_addr: remote_addr,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
use self::hyper_service::HyperService;
|
||||
mod hyper_service {
|
||||
use super::{Request, Response, Service, Stream};
|
||||
/// A "trait alias" for any type that implements `Service` with hyper's
|
||||
/// Request, Response, and Error types, and a streaming body.
|
||||
/// This method allows the ability to share a `Core` with multiple servers.
|
||||
///
|
||||
/// There is an auto implementation inside hyper, so no one can actually
|
||||
/// implement this trait. It simply exists to reduce the amount of generics
|
||||
/// needed.
|
||||
pub trait HyperService: Service + Sealed {
|
||||
#[doc(hidden)]
|
||||
type ResponseBody;
|
||||
#[doc(hidden)]
|
||||
type Sealed: Sealed2;
|
||||
}
|
||||
|
||||
pub trait Sealed {}
|
||||
pub trait Sealed2 {}
|
||||
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct Opaque {
|
||||
_inner: (),
|
||||
}
|
||||
|
||||
impl Sealed2 for Opaque {}
|
||||
|
||||
impl<S, B> Sealed for S
|
||||
where
|
||||
S: Service<
|
||||
Request=Request,
|
||||
Response=Response<B>,
|
||||
Error=::Error,
|
||||
>,
|
||||
B: Stream<Error=::Error>,
|
||||
B::Item: AsRef<[u8]>,
|
||||
{}
|
||||
|
||||
impl<S, B> HyperService for S
|
||||
where
|
||||
S: Service<
|
||||
Request=Request,
|
||||
Response=Response<B>,
|
||||
Error=::Error,
|
||||
>,
|
||||
S: Sealed,
|
||||
B: Stream<Error=::Error>,
|
||||
B::Item: AsRef<[u8]>,
|
||||
/// Bind the provided `addr` and return a server with a shared `Core`.
|
||||
///
|
||||
/// This is method will bind the `addr` provided with a new TCP listener ready
|
||||
/// to accept connections. Each connection will be processed with the
|
||||
/// `new_service` object provided as well, creating a new service per
|
||||
/// connection.
|
||||
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrStream, S>>
|
||||
where S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error>,
|
||||
Bd: Stream<Item=B, Error=::Error>,
|
||||
{
|
||||
type ResponseBody = B;
|
||||
type Sealed = Opaque;
|
||||
let listener = TcpListener::bind(addr, &handle)?;
|
||||
let incoming = AddrStream {
|
||||
addr: listener.local_addr()?,
|
||||
listener: listener,
|
||||
};
|
||||
Ok(self.serve(incoming, new_service))
|
||||
}
|
||||
|
||||
}
|
||||
/// A future binding a connection with a Service.
|
||||
///
|
||||
/// Polling this future will drive HTTP forward.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct Connection<I, S>
|
||||
where
|
||||
S: HyperService,
|
||||
S::ResponseBody: Stream<Error=::Error>,
|
||||
<S::ResponseBody as Stream>::Item: AsRef<[u8]>,
|
||||
{
|
||||
conn: proto::dispatch::Dispatcher<proto::dispatch::Server<S>, S::ResponseBody, I, <S::ResponseBody as Stream>::Item, proto::ServerTransaction, proto::KA>,
|
||||
}
|
||||
|
||||
impl<I, B, S> Future for Connection<I, S>
|
||||
where S: Service<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
|
||||
I: AsyncRead + AsyncWrite + 'static,
|
||||
B: Stream<Error=::Error> + 'static,
|
||||
B::Item: AsRef<[u8]>,
|
||||
{
|
||||
type Item = self::unnameable::Opaque;
|
||||
type Error = ::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
try_ready!(self.conn.poll());
|
||||
Ok(self::unnameable::opaque().into())
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S> fmt::Debug for Connection<I, S>
|
||||
where
|
||||
S: HyperService,
|
||||
S::ResponseBody: Stream<Error=::Error>,
|
||||
<S::ResponseBody as Stream>::Item: AsRef<[u8]>,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("Connection")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
mod unnameable {
|
||||
// This type is specifically not exported outside the crate,
|
||||
// so no one can actually name the type. With no methods, we make no
|
||||
// promises about this type.
|
||||
//
|
||||
// All of that to say we can eventually replace the type returned
|
||||
// to something else, and it would not be a breaking change.
|
||||
//
|
||||
// We may want to eventually yield the `T: AsyncRead + AsyncWrite`, which
|
||||
// doesn't have a `Debug` bound. So, this type can't implement `Debug`
|
||||
// either, so the type change doesn't break people.
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct Opaque {
|
||||
_inner: (),
|
||||
//TODO: make public
|
||||
fn serve<I, S, Bd>(&self, incoming: I, new_service: S) -> Serve<I, S>
|
||||
where I: Stream<Error=::std::io::Error>,
|
||||
I::Item: AsyncRead + AsyncWrite,
|
||||
S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error>,
|
||||
Bd: Stream<Item=B, Error=::Error>,
|
||||
{
|
||||
Serve {
|
||||
incoming: incoming,
|
||||
new_service: new_service,
|
||||
protocol: Http {
|
||||
keep_alive: self.keep_alive,
|
||||
pipeline: self.pipeline,
|
||||
_marker: PhantomData,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn opaque() -> Opaque {
|
||||
Opaque {
|
||||
_inner: (),
|
||||
/// Bind a connection together with a Service.
|
||||
///
|
||||
/// This returns a Future that must be polled in order for HTTP to be
|
||||
/// driven on the connection.
|
||||
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S>
|
||||
where S: Service<Request = Request, Response = Response<Bd>, Error = ::Error>,
|
||||
Bd: Stream<Error=::Error>,
|
||||
Bd::Item: AsRef<[u8]>,
|
||||
I: AsyncRead + AsyncWrite,
|
||||
|
||||
{
|
||||
let ka = if self.keep_alive {
|
||||
proto::KA::Busy
|
||||
} else {
|
||||
proto::KA::Disabled
|
||||
};
|
||||
let mut conn = proto::Conn::new(io, ka);
|
||||
conn.set_flush_pipeline(self.pipeline);
|
||||
Connection {
|
||||
conn: proto::dispatch::Dispatcher::new(proto::dispatch::Server::new(service), conn),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
impl<B> Clone for Http<B> {
|
||||
fn clone(&self) -> Http<B> {
|
||||
Http {
|
||||
@@ -584,6 +506,8 @@ impl<T, B> Service for HttpService<T>
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Server =====
|
||||
|
||||
impl<S, B> Server<S, B>
|
||||
where S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
|
||||
B: Stream<Error=::Error> + 'static,
|
||||
@@ -619,21 +543,6 @@ impl<S, B> Server<S, B>
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure the `shutdown_signal`.
|
||||
pub fn shutdown_signal<F>(self, signal: F) -> ServerFuture<F, S, B>
|
||||
where F: Future<Item = (), Error = ()>
|
||||
{
|
||||
ServerFuture {
|
||||
server: self,
|
||||
info: Rc::new(RefCell::new(Info {
|
||||
active: 0,
|
||||
blocker: None,
|
||||
})),
|
||||
shutdown_signal: signal,
|
||||
shutdown: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute this server infinitely.
|
||||
///
|
||||
/// This method does not currently return, but it will return an error if
|
||||
@@ -658,14 +567,9 @@ impl<S, B> Server<S, B>
|
||||
pub fn run_until<F>(self, shutdown_signal: F) -> ::Result<()>
|
||||
where F: Future<Item = (), Error = ()>,
|
||||
{
|
||||
let Server { protocol, new_service, reactor, listener, shutdown_timeout, no_proto } = self;
|
||||
let Server { protocol, new_service, mut reactor, listener, shutdown_timeout, no_proto } = self;
|
||||
|
||||
let mut core = match reactor {
|
||||
Reactor::Core(core) => core,
|
||||
_ => panic!("Server does not own its core, use `Handle::spawn()` to run the service!"),
|
||||
};
|
||||
|
||||
let handle = core.handle();
|
||||
let handle = reactor.handle();
|
||||
|
||||
// Mini future to track the number of active services
|
||||
let info = Rc::new(RefCell::new(Info {
|
||||
@@ -681,7 +585,7 @@ impl<S, B> Server<S, B>
|
||||
};
|
||||
info.borrow_mut().active += 1;
|
||||
if no_proto {
|
||||
let fut = protocol.no_proto(socket, s)
|
||||
let fut = protocol.serve_connection(socket, s)
|
||||
.map(|_| ())
|
||||
.map_err(|err| error!("no_proto error: {}", err));
|
||||
handle.spawn(fut);
|
||||
@@ -702,7 +606,7 @@ impl<S, B> Server<S, B>
|
||||
//
|
||||
// When we get a shutdown signal (`Ok`) then we drop the TCP listener to
|
||||
// stop accepting incoming connections.
|
||||
match core.run(shutdown_signal.select(srv)) {
|
||||
match reactor.run(shutdown_signal.select(srv)) {
|
||||
Ok(((), _incoming)) => {}
|
||||
Err((e, _other)) => return Err(e.into()),
|
||||
}
|
||||
@@ -716,97 +620,13 @@ impl<S, B> Server<S, B>
|
||||
// here have been destroyed.
|
||||
let timeout = try!(Timeout::new(shutdown_timeout, &handle));
|
||||
let wait = WaitUntilZero { info: info.clone() };
|
||||
match core.run(wait.select(timeout)) {
|
||||
match reactor.run(wait.select(timeout)) {
|
||||
Ok(_) => Ok(()),
|
||||
Err((e, _)) => Err(e.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, B> Future for Server<S, B>
|
||||
where S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
|
||||
B: Stream<Error=::Error> + 'static,
|
||||
B::Item: AsRef<[u8]>,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Reactor::Core(_) = self.reactor {
|
||||
panic!("Server owns its core, use `Server::run()` to run the service!")
|
||||
}
|
||||
|
||||
loop {
|
||||
match self.listener.accept() {
|
||||
Ok((socket, addr)) => {
|
||||
// TODO: use the NotifyService
|
||||
match self.new_service.new_service() {
|
||||
Ok(srv) => self.protocol.bind_connection(&self.handle(),
|
||||
socket,
|
||||
addr,
|
||||
srv),
|
||||
Err(e) => debug!("internal error: {:?}", e),
|
||||
}
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
|
||||
Err(e) => debug!("internal error: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, S, B> Future for ServerFuture<F, S, B>
|
||||
where F: Future<Item = (), Error = ()>,
|
||||
S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
|
||||
B: Stream<Error=::Error> + 'static,
|
||||
B::Item: AsRef<[u8]>,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
loop {
|
||||
if let Some(ref mut shutdown) = self.shutdown {
|
||||
match shutdown.poll() {
|
||||
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err((e, _)) => debug!("internal error: {:?}", e),
|
||||
}
|
||||
} else if let Ok(Async::Ready(())) = self.shutdown_signal.poll() {
|
||||
match Timeout::new(self.server.shutdown_timeout, &self.server.handle()) {
|
||||
Ok(timeout) => {
|
||||
let wait = WaitUntilZero { info: self.info.clone() };
|
||||
self.shutdown = Some(wait.select(timeout))
|
||||
},
|
||||
Err(e) => debug!("internal error: {:?}", e),
|
||||
}
|
||||
} else {
|
||||
match self.server.listener.accept() {
|
||||
Ok((socket, addr)) => {
|
||||
match self.server.new_service.new_service() {
|
||||
Ok(inner_srv) => {
|
||||
let srv = NotifyService {
|
||||
inner: inner_srv,
|
||||
info: Rc::downgrade(&self.info),
|
||||
};
|
||||
self.info.borrow_mut().active += 1;
|
||||
self.server.protocol.bind_connection(&self.server.handle(),
|
||||
socket,
|
||||
addr,
|
||||
srv)
|
||||
},
|
||||
Err(e) => debug!("internal error: {:?}", e),
|
||||
}
|
||||
},
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
|
||||
Err(e) => debug!("internal error: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for Server<S, B>
|
||||
where B::Item: AsRef<[u8]>
|
||||
{
|
||||
@@ -820,17 +640,164 @@ where B::Item: AsRef<[u8]>
|
||||
}
|
||||
}
|
||||
|
||||
impl <F, S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for ServerFuture<F, S, B>
|
||||
where B::Item: AsRef<[u8]>,
|
||||
F: Future<Item = (), Error = ()>
|
||||
// ===== impl Serve =====
|
||||
|
||||
impl<I, S> Serve<I, S> {
|
||||
/*
|
||||
/// Spawn all incoming connections onto the provide executor.
|
||||
pub fn spawn_all<E>(self, executor: E) -> SpawnAll<I, S, E> {
|
||||
SpawnAll {
|
||||
executor: executor,
|
||||
serve: self,
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
/// Get a reference to the incoming stream.
|
||||
#[inline]
|
||||
pub fn incoming_ref(&self) -> &I {
|
||||
&self.incoming
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S, B> Stream for Serve<I, S>
|
||||
where
|
||||
I: Stream<Error=io::Error>,
|
||||
I::Item: AsyncRead + AsyncWrite,
|
||||
S: NewService<Request=Request, Response=Response<B>, Error=::Error>,
|
||||
B: Stream<Error=::Error>,
|
||||
B::Item: AsRef<[u8]>,
|
||||
{
|
||||
type Item = Connection<I::Item, S::Instance>;
|
||||
type Error = ::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
if let Some(io) = try_ready!(self.incoming.poll()) {
|
||||
let service = self.new_service.new_service()?;
|
||||
Ok(Async::Ready(Some(self.protocol.serve_connection(io, service))))
|
||||
} else {
|
||||
Ok(Async::Ready(None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl SpawnAll =====
|
||||
|
||||
/*
|
||||
impl<I, S, E> Future for SpawnAll<I, S, E>
|
||||
where
|
||||
I: Stream<Error=io::Error>,
|
||||
I::Item: AsyncRead + AsyncWrite,
|
||||
S: NewService<Request=Request, Response=Response<B>, Error=::Error>,
|
||||
B: Stream<Error=::Error>,
|
||||
B::Item: AsRef<[u8]>,
|
||||
//E: Executor<Connection<I::Item, S::Instance>>,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
loop {
|
||||
if let Some(conn) = try_ready!(self.serve.poll()) {
|
||||
let fut = conn
|
||||
.map(|_| ())
|
||||
.map_err(|err| debug!("conn error: {}", err));
|
||||
match self.executor.execute(fut) {
|
||||
Ok(()) => (),
|
||||
Err(err) => match err.kind() {
|
||||
ExecuteErrorKind::NoCapacity => {
|
||||
debug!("SpawnAll::poll; executor no capacity");
|
||||
// continue loop
|
||||
},
|
||||
ExecuteErrorKind::Shutdown | _ => {
|
||||
debug!("SpawnAll::poll; executor shutdown");
|
||||
return Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// ===== impl Connection =====
|
||||
|
||||
impl<I, B, S> Future for Connection<I, S>
|
||||
where S: Service<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
|
||||
I: AsyncRead + AsyncWrite + 'static,
|
||||
B: Stream<Error=::Error> + 'static,
|
||||
B::Item: AsRef<[u8]>,
|
||||
{
|
||||
type Item = self::unnameable::Opaque;
|
||||
type Error = ::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
try_ready!(self.conn.poll());
|
||||
Ok(self::unnameable::opaque().into())
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S> fmt::Debug for Connection<I, S>
|
||||
where
|
||||
S: HyperService,
|
||||
S::ResponseBody: Stream<Error=::Error>,
|
||||
<S::ResponseBody as Stream>::Item: AsRef<[u8]>,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("ServerFuture")
|
||||
.field("server", &self.server)
|
||||
.field("info", &"...")
|
||||
.field("shutdown_signal", &"...")
|
||||
.field("shutdown", &"...")
|
||||
.finish()
|
||||
f.debug_struct("Connection")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
mod unnameable {
|
||||
// This type is specifically not exported outside the crate,
|
||||
// so no one can actually name the type. With no methods, we make no
|
||||
// promises about this type.
|
||||
//
|
||||
// All of that to say we can eventually replace the type returned
|
||||
// to something else, and it would not be a breaking change.
|
||||
//
|
||||
// We may want to eventually yield the `T: AsyncRead + AsyncWrite`, which
|
||||
// doesn't have a `Debug` bound. So, this type can't implement `Debug`
|
||||
// either, so the type change doesn't break people.
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct Opaque {
|
||||
_inner: (),
|
||||
}
|
||||
|
||||
pub fn opaque() -> Opaque {
|
||||
Opaque {
|
||||
_inner: (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl AddrStream =====
|
||||
|
||||
impl AddrStream {
|
||||
/// Get the local address bound to this listener.
|
||||
pub fn local_addr(&self) -> SocketAddr {
|
||||
self.addr
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for AddrStream {
|
||||
type Item = TcpStream;
|
||||
type Error = ::std::io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
loop {
|
||||
match self.listener.accept() {
|
||||
Ok((socket, _addr)) => {
|
||||
return Ok(Async::Ready(Some(socket)));
|
||||
},
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
|
||||
Err(e) => debug!("internal error: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -889,3 +856,55 @@ impl Future for WaitUntilZero {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mod hyper_service {
|
||||
use super::{Request, Response, Service, Stream};
|
||||
/// A "trait alias" for any type that implements `Service` with hyper's
|
||||
/// Request, Response, and Error types, and a streaming body.
|
||||
///
|
||||
/// There is an auto implementation inside hyper, so no one can actually
|
||||
/// implement this trait. It simply exists to reduce the amount of generics
|
||||
/// needed.
|
||||
pub trait HyperService: Service + Sealed {
|
||||
#[doc(hidden)]
|
||||
type ResponseBody;
|
||||
#[doc(hidden)]
|
||||
type Sealed: Sealed2;
|
||||
}
|
||||
|
||||
pub trait Sealed {}
|
||||
pub trait Sealed2 {}
|
||||
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct Opaque {
|
||||
_inner: (),
|
||||
}
|
||||
|
||||
impl Sealed2 for Opaque {}
|
||||
|
||||
impl<S, B> Sealed for S
|
||||
where
|
||||
S: Service<
|
||||
Request=Request,
|
||||
Response=Response<B>,
|
||||
Error=::Error,
|
||||
>,
|
||||
B: Stream<Error=::Error>,
|
||||
B::Item: AsRef<[u8]>,
|
||||
{}
|
||||
|
||||
impl<S, B> HyperService for S
|
||||
where
|
||||
S: Service<
|
||||
Request=Request,
|
||||
Response=Response<B>,
|
||||
Error=::Error,
|
||||
>,
|
||||
S: Sealed,
|
||||
B: Stream<Error=::Error>,
|
||||
B::Item: AsRef<[u8]>,
|
||||
{
|
||||
type ResponseBody = B;
|
||||
type Sealed = Opaque;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -524,7 +524,7 @@ fn no_proto_empty_parse_eof_does_not_return_error() {
|
||||
.map_err(|_| unreachable!())
|
||||
.and_then(|(item, _incoming)| {
|
||||
let (socket, _) = item.unwrap();
|
||||
Http::new().no_proto(socket, HelloWorld)
|
||||
Http::<hyper::Chunk>::new().serve_connection(socket, HelloWorld)
|
||||
});
|
||||
|
||||
core.run(fut).unwrap();
|
||||
@@ -546,7 +546,7 @@ fn no_proto_nonempty_parse_eof_returns_error() {
|
||||
.map_err(|_| unreachable!())
|
||||
.and_then(|(item, _incoming)| {
|
||||
let (socket, _) = item.unwrap();
|
||||
Http::new().no_proto(socket, HelloWorld)
|
||||
Http::<hyper::Chunk>::new().serve_connection(socket, HelloWorld)
|
||||
.map(|_| ())
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user