feat(service): add poll_ready to Service and MakeService (#1767)

This commit is contained in:
Yusuke Sasaki
2019-02-28 02:30:52 +09:00
committed by Sean McArthur
parent ce2b540f9d
commit 0bf30ccc68
6 changed files with 84 additions and 18 deletions

View File

@@ -189,7 +189,7 @@ where
// can dispatch receive, or does it still care about, an incoming message?
match self.dispatch.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => unreachable!("dispatch not ready when conn is"),
Ok(Async::NotReady) => return Ok(Async::NotReady), // service might not be ready
Err(()) => {
trace!("dispatch no longer receiving messages");
self.close();
@@ -410,7 +410,11 @@ where
if self.in_flight.is_some() {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
self.service.poll_ready()
.map_err(|_e| {
// FIXME: return error value.
trace!("service closed");
})
}
}

View File

@@ -128,19 +128,37 @@ where
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
E: H2Exec<S::Future, B>,
{
while let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) {
trace!("incoming request");
let content_length = content_length_parse_all(req.headers());
let req = req.map(|stream| {
::Body::h2(stream, content_length)
});
let fut = H2Stream::new(service.call(req), respond);
exec.execute_h2stream(fut)?;
}
loop {
// At first, polls the readiness of supplied service.
match service.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => {
// use `poll_close` instead of `poll`, in order to avoid accepting a request.
try_ready!(self.conn.poll_close().map_err(::Error::new_h2));
trace!("incoming connection complete");
return Ok(Async::Ready(()));
}
Err(err) => {
trace!("service closed");
return Err(::Error::new_user_service(err));
}
}
// no more incoming streams...
trace!("incoming connection complete");
Ok(Async::Ready(()))
// When the service is ready, accepts an incoming request.
if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) {
trace!("incoming request");
let content_length = content_length_parse_all(req.headers());
let req = req.map(|stream| {
::Body::h2(stream, content_length)
});
let fut = H2Stream::new(service.call(req), respond);
exec.execute_h2stream(fut)?;
} else {
// no more incoming streams...
trace!("incoming connection complete");
return Ok(Async::Ready(()))
}
}
}
}

View File

@@ -646,6 +646,15 @@ where
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.make_service.poll_ready_ref() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
trace!("make_service closed");
return Err(::Error::new_user_new_service(e));
}
}
if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) {
let new_fut = self.make_service.make_service_ref(&io);
Ok(Async::Ready(Some(Connecting {

View File

@@ -1,7 +1,7 @@
use std::error::Error as StdError;
use std::fmt;
use futures::{Future, IntoFuture};
use futures::{Async, Future, IntoFuture, Poll};
use body::Payload;
use super::Service;
@@ -30,6 +30,15 @@ pub trait MakeService<Ctx> {
/// The error type that can be returned when creating a new `Service`.
type MakeError: Into<Box<StdError + Send + Sync>>;
/// Returns `Ready` when the constructor is ready to create a new `Service`.
///
/// The implementation of this method is allowed to return a `Ready` even if
/// the factory is not ready to create a new service. In this case, the future
/// returned from `make_service` will resolve to an error.
fn poll_ready(&mut self) -> Poll<(), Self::MakeError> {
Ok(Async::Ready(()))
}
/// Create a new `Service`.
fn make_service(&mut self, ctx: Ctx) -> Self::Future;
}
@@ -46,7 +55,8 @@ pub trait MakeServiceRef<Ctx>: self::sealed::Sealed<Ctx> {
ResBody=Self::ResBody,
Error=Self::Error,
>;
type Future: Future<Item=Self::Service>;
type MakeError: Into<Box<StdError + Send + Sync>>;
type Future: Future<Item=Self::Service, Error=Self::MakeError>;
// Acting like a #[non_exhaustive] for associated types of this trait.
//
@@ -59,6 +69,8 @@ pub trait MakeServiceRef<Ctx>: self::sealed::Sealed<Ctx> {
// if necessary.
type __DontNameMe: self::sealed::CantImpl;
fn poll_ready_ref(&mut self) -> Poll<(), Self::MakeError>;
fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future;
}
@@ -76,10 +88,15 @@ where
type Service = S;
type ReqBody = IB;
type ResBody = OB;
type MakeError = ME;
type Future = F;
type __DontNameMe = self::sealed::CantName;
fn poll_ready_ref(&mut self) -> Poll<(), Self::MakeError> {
self.poll_ready()
}
fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future {
self.make_service(ctx)
}

View File

@@ -1,6 +1,6 @@
use std::error::Error as StdError;
use futures::{Future, IntoFuture};
use futures::{Async, Future, IntoFuture, Poll};
use body::Payload;
use super::{MakeService, Service};
@@ -29,6 +29,11 @@ pub trait NewService {
/// The error type that can be returned when creating a new `Service`.
type InitError: Into<Box<StdError + Send + Sync>>;
#[doc(hidden)]
fn poll_ready(&mut self) -> Poll<(), Self::InitError> {
Ok(Async::Ready(()))
}
/// Create a new `Service`.
fn new_service(&self) -> Self::Future;
}
@@ -63,6 +68,10 @@ where
type Future = N::Future;
type MakeError = N::InitError;
fn poll_ready(&mut self) -> Poll<(), Self::MakeError> {
NewService::poll_ready(self)
}
fn make_service(&mut self, _: Ctx) -> Self::Future {
self.new_service()
}

View File

@@ -2,7 +2,7 @@ use std::error::Error as StdError;
use std::fmt;
use std::marker::PhantomData;
use futures::{future, Future, IntoFuture};
use futures::{future, Async, Future, IntoFuture, Poll};
use body::Payload;
use common::Never;
@@ -26,6 +26,15 @@ pub trait Service {
/// The `Future` returned by this `Service`.
type Future: Future<Item=Response<Self::ResBody>, Error=Self::Error>;
/// Returns `Ready` when the service is able to process requests.
///
/// The implementation of this method is allowed to return a `Ready` even if
/// the service is not ready to process. In this case, the future returned
/// from `call` will resolve to an error.
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
/// Calls this `Service` with a request, returning a `Future` of the response.
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future;
}