feat(lib): convert to use tokio 0.1

BREAKING CHANGE: All uses of `Handle` now need to be new-tokio `Handle`.

Co-authored-by: Sean McArthur <sean@seanmonstar.com>
This commit is contained in:
Sam Reis
2018-03-15 10:46:03 +11:00
committed by Sean McArthur
parent 8c52c2dfd3
commit 27b8db3af8
23 changed files with 680 additions and 505 deletions

View File

@@ -208,7 +208,10 @@ where
}
//TODO: replace with `impl Future` when stable
pub(crate) fn send_request_retryable(&mut self, req: Request<B>) -> Box<Future<Item=Response<Body>, Error=(::Error, Option<Request<B>>)>> {
pub(crate) fn send_request_retryable(&mut self, req: Request<B>) -> Box<Future<Item=Response<Body>, Error=(::Error, Option<Request<B>>)> + Send>
where
B: Send,
{
let inner = match self.dispatch.try_send(req) {
Ok(rx) => {
Either::A(rx.then(move |res| {

View File

@@ -9,6 +9,7 @@ use std::error::Error as StdError;
use std::fmt;
use std::io;
use std::mem;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
@@ -18,9 +19,10 @@ use futures::sync::oneshot;
use futures_cpupool::{Builder as CpuPoolBuilder};
use http::Uri;
use http::uri::Scheme;
use net2::TcpBuilder;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
use tokio::net::{TcpStream, TcpStreamNew};
use tokio::net::{TcpStream, ConnectFuture};
use super::dns;
use self::http_connector::HttpConnectorBlockingTask;
@@ -30,13 +32,13 @@ use self::http_connector::HttpConnectorBlockingTask;
/// A connector receives a [`Destination`](Destination) describing how a
/// connection should be estabilished, and returns a `Future` of the
/// ready connection.
pub trait Connect {
pub trait Connect: Send + Sync {
/// The connected IO Stream.
type Transport: AsyncRead + AsyncWrite + 'static;
type Transport: AsyncRead + AsyncWrite + Send + 'static;
/// An error occured when trying to connect.
type Error;
/// A Future that will resolve to the connected Transport.
type Future: Future<Item=(Self::Transport, Connected), Error=Self::Error>;
type Future: Future<Item=(Self::Transport, Connected), Error=Self::Error> + Send;
/// Connect to a destination.
fn connect(&self, dst: Destination) -> Self::Future;
}
@@ -133,6 +135,28 @@ impl Connected {
*/
}
fn connect(addr: &SocketAddr, handle: &Handle) -> io::Result<ConnectFuture> {
let builder = match addr {
&SocketAddr::V4(_) => TcpBuilder::new_v4()?,
&SocketAddr::V6(_) => TcpBuilder::new_v6()?,
};
if cfg!(windows) {
// Windows requires a socket be bound before calling connect
let any: SocketAddr = match addr {
&SocketAddr::V4(_) => {
([0, 0, 0, 0], 0).into()
},
&SocketAddr::V6(_) => {
([0, 0, 0, 0, 0, 0, 0, 0], 0).into()
}
};
builder.bind(any)?;
}
Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, handle))
}
/// A connector for the `http` scheme.
///
/// Performs DNS resolution in a thread pool, and then connects over TCP.
@@ -162,7 +186,7 @@ impl HttpConnector {
/// Takes an executor to run blocking tasks on.
#[inline]
pub fn new_with_executor<E: 'static>(executor: E, handle: &Handle) -> HttpConnector
where E: Executor<HttpConnectorBlockingTask>
where E: Executor<HttpConnectorBlockingTask> + Send + Sync
{
HttpConnector {
executor: HttpConnectExecutor(Arc::new(executor)),
@@ -336,7 +360,7 @@ impl fmt::Debug for HttpConnecting {
struct ConnectingTcp {
addrs: dns::IpAddrs,
current: Option<TcpStreamNew>,
current: Option<ConnectFuture>,
}
impl ConnectingTcp {
@@ -352,14 +376,14 @@ impl ConnectingTcp {
err = Some(e);
if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr);
*current = TcpStream::connect(&addr, handle);
*current = connect(&addr, handle)?;
continue;
}
}
}
} else if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr);
self.current = Some(TcpStream::connect(&addr, handle));
self.current = Some(connect(&addr, handle)?);
continue;
}
@@ -393,7 +417,7 @@ mod http_connector {
}
#[derive(Clone)]
struct HttpConnectExecutor(Arc<Executor<HttpConnectorBlockingTask>>);
struct HttpConnectExecutor(Arc<Executor<HttpConnectorBlockingTask> + Send + Sync>);
impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
fn execute(&self, future: oneshot::Execute<dns::Work>) -> Result<(), ExecuteError<oneshot::Execute<dns::Work>>> {
@@ -406,43 +430,44 @@ impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
mod tests {
#![allow(deprecated)]
use std::io;
use tokio::reactor::Core;
use futures::Future;
use tokio::runtime::Runtime;
use super::{Connect, Destination, HttpConnector};
#[test]
fn test_errors_missing_authority() {
let mut core = Core::new().unwrap();
let runtime = Runtime::new().unwrap();
let uri = "/foo/bar?baz".parse().unwrap();
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1, &core.handle());
let connector = HttpConnector::new(1, runtime.handle());
assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
#[test]
fn test_errors_enforce_http() {
let mut core = Core::new().unwrap();
let runtime = Runtime::new().unwrap();
let uri = "https://example.domain/foo/bar?baz".parse().unwrap();
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1, &core.handle());
let connector = HttpConnector::new(1, runtime.handle());
assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
#[test]
fn test_errors_missing_scheme() {
let mut core = Core::new().unwrap();
let runtime = Runtime::new().unwrap();
let uri = "example.domain".parse().unwrap();
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1, &core.handle());
let connector = HttpConnector::new(1, runtime.handle());
assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
}

View File

@@ -28,7 +28,8 @@ pub struct Sender<T, U> {
// response have been fully processed, and a connection is ready
// for more.
giver: signal::Giver,
inner: mpsc::Sender<(T, Callback<T, U>)>,
//inner: mpsc::Sender<(T, Callback<T, U>)>,
inner: mpsc::Sender<Envelope<T, U>>,
}
impl<T, U> Sender<T, U> {
@@ -51,21 +52,22 @@ impl<T, U> Sender<T, U> {
pub fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner.try_send((val, Callback::Retry(tx)))
self.inner.try_send(Envelope(Some((val, Callback::Retry(tx)))))
.map(move |_| rx)
.map_err(|e| e.into_inner().0)
.map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0)
}
pub fn send(&mut self, val: T) -> Result<Promise<U>, T> {
let (tx, rx) = oneshot::channel();
self.inner.try_send((val, Callback::NoRetry(tx)))
self.inner.try_send(Envelope(Some((val, Callback::NoRetry(tx)))))
.map(move |_| rx)
.map_err(|e| e.into_inner().0)
.map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0)
}
}
pub struct Receiver<T, U> {
inner: mpsc::Receiver<(T, Callback<T, U>)>,
//inner: mpsc::Receiver<(T, Callback<T, U>)>,
inner: mpsc::Receiver<Envelope<T, U>>,
taker: signal::Taker,
}
@@ -75,7 +77,9 @@ impl<T, U> Stream for Receiver<T, U> {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.inner.poll() {
Ok(Async::Ready(item)) => Ok(Async::Ready(item)),
Ok(Async::Ready(item)) => Ok(Async::Ready(item.map(|mut env| {
env.0.take().expect("envelope not dropped")
}))),
Ok(Async::NotReady) => {
self.taker.want();
Ok(Async::NotReady)
@@ -85,6 +89,16 @@ impl<T, U> Stream for Receiver<T, U> {
}
}
/*
TODO: with futures 0.2, bring this Drop back and toss Envelope
The problem is, there is a bug in futures 0.1 mpsc channel, where
even though you may call `rx.close()`, `rx.poll()` may still think
there are messages and so should park the current task. In futures
0.2, we can use `try_next`, and not even risk such a bug.
For now, use an `Envelope` that has this drop guard logic instead.
impl<T, U> Drop for Receiver<T, U> {
fn drop(&mut self) {
self.taker.cancel();
@@ -105,6 +119,17 @@ impl<T, U> Drop for Receiver<T, U> {
}
}
*/
struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
impl<T, U> Drop for Envelope<T, U> {
fn drop(&mut self) {
if let Some((val, cb)) = self.0.take() {
let _ = cb.send(Err((::Error::new_canceled(None::<::Error>), Some(val))));
}
}
}
pub enum Callback<T, U> {
Retry(oneshot::Sender<Result<U, (::Error, Option<T>)>>),

View File

@@ -3,7 +3,6 @@
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
@@ -13,6 +12,7 @@ use http::{Method, Request, Response, Uri, Version};
use http::header::{Entry, HeaderValue, HOST};
use http::uri::Scheme;
use tokio::reactor::Handle;
use tokio_executor::spawn;
pub use tokio_service::Service;
use proto::body::{Body, Entity};
@@ -36,7 +36,7 @@ mod tests;
/// A Client to make outgoing HTTP requests.
pub struct Client<C, B = proto::Body> {
connector: Rc<C>,
connector: Arc<C>,
executor: Exec,
h1_writev: bool,
pool: Pool<PoolClient<B>>,
@@ -52,6 +52,12 @@ impl Client<HttpConnector, proto::Body> {
}
}
impl Default for Client<HttpConnector, proto::Body> {
fn default() -> Client<HttpConnector, proto::Body> {
Client::new(&Handle::current())
}
}
impl Client<HttpConnector, proto::Body> {
/// Configure a Client.
///
@@ -59,11 +65,11 @@ impl Client<HttpConnector, proto::Body> {
///
/// ```no_run
/// # extern crate hyper;
/// # extern crate tokio_core;
/// # extern crate tokio;
///
/// # fn main() {
/// # let core = tokio_core::reactor::Core::new().unwrap();
/// # let handle = core.handle();
/// # let runtime = tokio::runtime::Runtime::new().unwrap();
/// # let handle = runtime.handle();
/// let client = hyper::Client::configure()
/// .keep_alive(true)
/// .build(&handle);
@@ -77,22 +83,10 @@ impl Client<HttpConnector, proto::Body> {
}
impl<C, B> Client<C, B> {
// Eventually, a Client won't really care about a tokio Handle, and only
// the executor used to spawn background tasks. Removing this method is
// a breaking change, so for now, it's just deprecated.
#[doc(hidden)]
#[deprecated]
pub fn handle(&self) -> &Handle {
match self.executor {
Exec::Handle(ref h) => h,
Exec::Executor(..) => panic!("Client not built with a Handle"),
}
}
#[inline]
fn configured(config: Config<C, B>, exec: Exec) -> Client<C, B> {
Client {
connector: Rc::new(config.connector),
connector: Arc::new(config.connector),
executor: exec,
h1_writev: config.h1_writev,
pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
@@ -103,10 +97,11 @@ impl<C, B> Client<C, B> {
}
impl<C, B> Client<C, B>
where C: Connect<Error=io::Error> + 'static,
where C: Connect<Error=io::Error> + Sync + 'static,
C::Transport: 'static,
C::Future: 'static,
B: Entity<Error=::Error> + 'static,
B: Entity<Error=::Error> + Send + 'static,
B::Data: Send,
{
/// Send a `GET` request to the supplied `Uri`.
@@ -195,7 +190,7 @@ where C: Connect<Error=io::Error> + 'static,
}
//TODO: replace with `impl Future` when stable
fn send_request(&self, mut req: Request<B>, domain: &str) -> Box<Future<Item=Response<Body>, Error=ClientError<B>>> {
fn send_request(&self, mut req: Request<B>, domain: &str) -> Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send> {
let url = req.uri().clone();
let checkout = self.pool.checkout(domain);
let connect = {
@@ -280,16 +275,15 @@ where C: Connect<Error=io::Error> + 'static,
}
fn schedule_pool_timer(&self) {
if let Exec::Handle(ref h) = self.executor {
self.pool.spawn_expired_interval(h);
}
self.pool.spawn_expired_interval(&self.executor);
}
}
impl<C, B> Service for Client<C, B>
where C: Connect<Error=io::Error> + 'static,
C::Future: 'static,
B: Entity<Error=::Error> + 'static,
B: Entity<Error=::Error> + Send + 'static,
B::Data: Send,
{
type Request = Request<B>;
type Response = Response<Body>;
@@ -323,7 +317,7 @@ impl<C, B> fmt::Debug for Client<C, B> {
/// A `Future` that will resolve to an HTTP Response.
#[must_use = "futures do nothing unless polled"]
pub struct FutureResponse(Box<Future<Item=Response<Body>, Error=::Error> + 'static>);
pub struct FutureResponse(Box<Future<Item=Response<Body>, Error=::Error> + Send + 'static>);
impl fmt::Debug for FutureResponse {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -343,7 +337,7 @@ impl Future for FutureResponse {
struct RetryableSendRequest<C, B> {
client: Client<C, B>,
domain: String,
future: Box<Future<Item=Response<Body>, Error=ClientError<B>>>,
future: Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send>,
uri: Uri,
}
@@ -351,7 +345,8 @@ impl<C, B> Future for RetryableSendRequest<C, B>
where
C: Connect<Error=io::Error> + 'static,
C::Future: 'static,
B: Entity<Error=::Error> + 'static,
B: Entity<Error=::Error> + Send + 'static,
B::Data: Send,
{
type Item = Response<Body>;
type Error = ::Error;
@@ -562,12 +557,13 @@ impl<C, B> Config<C, B>
where C: Connect<Error=io::Error>,
C::Transport: 'static,
C::Future: 'static,
B: Entity<Error=::Error>,
B: Entity<Error=::Error> + Send,
B::Data: Send,
{
/// Construct the Client with this configuration.
#[inline]
pub fn build(self, handle: &Handle) -> Client<C, B> {
Client::configured(self, Exec::Handle(handle.clone()))
pub fn build(self) -> Client<C, B> {
Client::configured(self, Exec::Default)
}
/// Construct a Client with this configuration and an executor.
@@ -576,14 +572,16 @@ where C: Connect<Error=io::Error>,
/// to drive requests and responses.
pub fn executor<E>(self, executor: E) -> Client<C, B>
where
E: Executor<Background> + 'static,
E: Executor<Background> + Send + Sync + 'static,
{
Client::configured(self, Exec::Executor(Rc::new(executor)))
Client::configured(self, Exec::new(executor))
}
}
impl<B> Config<UseDefaultConnector, B>
where B: Entity<Error=::Error>,
where
B: Entity<Error=::Error> + Send,
B::Data: Send,
{
/// Construct the Client with this configuration.
#[inline]
@@ -592,7 +590,22 @@ where B: Entity<Error=::Error>,
if self.keep_alive {
connector.set_keepalive(self.keep_alive_timeout);
}
self.connector(connector).build(handle)
self.connector(connector).build()
}
/// Construct a Client with this configuration and an executor.
///
/// The executor will be used to spawn "background" connection tasks
/// to drive requests and responses.
pub fn build_with_executor<E>(self, handle: &Handle, executor: E) -> Client<HttpConnector, B>
where
E: Executor<Background> + Send + Sync + 'static,
{
let mut connector = HttpConnector::new(4, handle);
if self.keep_alive {
connector.set_keepalive(self.keep_alive_timeout);
}
self.connector(connector).executor(executor)
}
}
@@ -622,18 +635,22 @@ impl<C: Clone, B> Clone for Config<C, B> {
#[derive(Clone)]
enum Exec {
Handle(Handle),
Executor(Rc<Executor<Background>>),
Default,
Executor(Arc<Executor<Background> + Send + Sync>),
}
impl Exec {
pub(crate) fn new<E: Executor<Background> + Send + Sync + 'static>(executor: E) -> Exec {
Exec::Executor(Arc::new(executor))
}
fn execute<F>(&self, fut: F) -> io::Result<()>
where
F: Future<Item=(), Error=()> + 'static,
F: Future<Item=(), Error=()> + Send + 'static,
{
match *self {
Exec::Handle(ref h) => h.spawn(fut),
Exec::Default => spawn(fut),
Exec::Executor(ref e) => {
e.execute(bg(Box::new(fut)))
.map_err(|err| {
@@ -660,10 +677,10 @@ mod background {
// and only implementeds `Future`.
#[allow(missing_debug_implementations)]
pub struct Background {
inner: Box<Future<Item=(), Error=()>>,
inner: Box<Future<Item=(), Error=()> + Send>,
}
pub fn bg(fut: Box<Future<Item=(), Error=()>>) -> Background {
pub fn bg(fut: Box<Future<Item=(), Error=()> + Send>) -> Background {
Background {
inner: fut,
}

View File

@@ -6,7 +6,9 @@ use std::time::{Duration, Instant};
use futures::{Future, Async, Poll, Stream};
use futures::sync::oneshot;
use tokio::reactor::{Handle, Interval};
use futures_timer::Interval;
use super::Exec;
pub struct Pool<T> {
inner: Arc<Mutex<PoolInner<T>>>,
@@ -218,8 +220,8 @@ impl<T: Closed> PoolInner<T> {
}
impl<T: Closed + 'static> Pool<T> {
pub(super) fn spawn_expired_interval(&self, handle: &Handle) {
impl<T: Closed + Send + 'static> Pool<T> {
pub(super) fn spawn_expired_interval(&self, exec: &Exec) {
let dur = {
let mut inner = self.inner.lock().unwrap();
@@ -239,12 +241,11 @@ impl<T: Closed + 'static> Pool<T> {
}
};
let interval = Interval::new(dur, handle)
.expect("reactor is gone");
handle.spawn(IdleInterval {
let interval = Interval::new(dur);
exec.execute(IdleInterval {
interval: interval,
pool: Arc::downgrade(&self.inner),
});
}).unwrap();
}
}
@@ -431,7 +432,7 @@ mod tests {
use std::time::Duration;
use futures::{Async, Future};
use futures::future;
use super::{Closed, Pool};
use super::{Closed, Pool, Exec};
impl Closed for i32 {
fn is_closed(&self) -> bool {
@@ -489,9 +490,11 @@ mod tests {
#[test]
fn test_pool_timer_removes_expired() {
let mut core = ::tokio::reactor::Core::new().unwrap();
let runtime = ::tokio::runtime::Runtime::new().unwrap();
let pool = Pool::new(true, Some(Duration::from_millis(100)));
pool.spawn_expired_interval(&core.handle());
let executor = runtime.executor();
pool.spawn_expired_interval(&Exec::new(executor));
let key = Arc::new("foo".to_string());
pool.pooled(key.clone(), 41);
@@ -500,11 +503,9 @@ mod tests {
assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
let timeout = ::tokio::reactor::Timeout::new(
Duration::from_millis(400), // allow for too-good resolution
&core.handle()
).unwrap();
core.run(timeout).unwrap();
::futures_timer::Delay::new(
Duration::from_millis(400) // allow for too-good resolution
).wait().unwrap();
assert!(pool.inner.lock().unwrap().idle.get(&key).is_none());
}

View File

@@ -1,8 +1,11 @@
extern crate pretty_env_logger;
use std::thread;
use std::time::Duration;
use futures::Async;
use futures::future::poll_fn;
use tokio::reactor::Core;
use tokio::executor::thread_pool::{Builder as ThreadPoolBuilder};
use mock::MockConnector;
use super::*;
@@ -10,8 +13,8 @@ use super::*;
#[test]
fn retryable_request() {
let _ = pretty_env_logger::try_init();
let mut core = Core::new().unwrap();
let executor = ThreadPoolBuilder::new().pool_size(1).build();
let mut connector = MockConnector::new();
let sock1 = connector.mock("http://mock.local");
@@ -19,8 +22,7 @@ fn retryable_request() {
let client = Client::configure()
.connector(connector)
.build(&core.handle());
.executor(executor.sender().clone());
{
@@ -34,7 +36,7 @@ fn retryable_request() {
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
});
core.run(res1.join(srv1)).expect("res1");
res1.join(srv1).wait().expect("res1");
}
drop(sock1);
@@ -52,22 +54,21 @@ fn retryable_request() {
Ok(Async::Ready(()))
});
core.run(res2.join(srv2)).expect("res2");
res2.join(srv2).wait().expect("res2");
}
#[test]
fn conn_reset_after_write() {
let _ = pretty_env_logger::try_init();
let mut core = Core::new().unwrap();
let executor = ThreadPoolBuilder::new().pool_size(1).build();
let mut connector = MockConnector::new();
let sock1 = connector.mock("http://mock.local");
let client = Client::configure()
.connector(connector)
.build(&core.handle());
.executor(executor.sender().clone());
{
let req = Request::builder()
@@ -82,9 +83,12 @@ fn conn_reset_after_write() {
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
});
core.run(res1.join(srv1)).expect("res1");
res1.join(srv1).wait().expect("res1");
}
// sleep to allow some time for the connection to return to the pool
thread::sleep(Duration::from_secs(1));
let req = Request::builder()
.uri("http://mock.local/a")
.body(Default::default())
@@ -102,7 +106,7 @@ fn conn_reset_after_write() {
sock1.take();
Ok(Async::Ready(()))
});
let err = core.run(res2.join(srv2)).expect_err("res2");
let err = res2.join(srv2).wait().expect_err("res2");
match err {
::Error::Incomplete => (),
other => panic!("expected Incomplete, found {:?}", other)

View File

@@ -19,13 +19,16 @@
extern crate bytes;
#[macro_use] extern crate futures;
extern crate futures_cpupool;
extern crate futures_timer;
extern crate http;
extern crate httparse;
extern crate iovec;
#[macro_use] extern crate log;
extern crate net2;
extern crate relay;
extern crate time;
extern crate tokio_core as tokio;
extern crate tokio;
extern crate tokio_executor;
#[macro_use] extern crate tokio_io;
extern crate tokio_service;
extern crate unicase;

View File

@@ -1,8 +1,7 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::cmp;
use std::io::{self, Read, Write};
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use bytes::Buf;
use futures::{Async, Poll};
@@ -284,7 +283,7 @@ impl ::std::ops::Deref for AsyncIo<MockCursor> {
}
pub struct Duplex {
inner: Rc<RefCell<DuplexInner>>,
inner: Arc<Mutex<DuplexInner>>,
}
struct DuplexInner {
@@ -295,21 +294,22 @@ struct DuplexInner {
impl Read for Duplex {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.borrow_mut().read.read(buf)
self.inner.lock().unwrap().read.read(buf)
}
}
impl Write for Duplex {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Some(task) = self.inner.borrow_mut().handle_read_task.take() {
let mut inner = self.inner.lock().unwrap();
if let Some(task) = inner.handle_read_task.take() {
trace!("waking DuplexHandle read");
task.notify();
}
self.inner.borrow_mut().write.write(buf)
inner.write.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.borrow_mut().write.flush()
self.inner.lock().unwrap().write.flush()
}
}
@@ -323,20 +323,21 @@ impl AsyncWrite for Duplex {
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Some(task) = self.inner.borrow_mut().handle_read_task.take() {
let mut inner = self.inner.lock().unwrap();
if let Some(task) = inner.handle_read_task.take() {
task.notify();
}
self.inner.borrow_mut().write.write_buf(buf)
inner.write.write_buf(buf)
}
}
pub struct DuplexHandle {
inner: Rc<RefCell<DuplexInner>>,
inner: Arc<Mutex<DuplexInner>>,
}
impl DuplexHandle {
pub fn read(&self, buf: &mut [u8]) -> Poll<usize, io::Error> {
let mut inner = self.inner.borrow_mut();
let mut inner = self.inner.lock().unwrap();
assert!(buf.len() >= inner.write.inner.len());
if inner.write.inner.is_empty() {
trace!("DuplexHandle read parking");
@@ -348,7 +349,7 @@ impl DuplexHandle {
}
pub fn write(&self, bytes: &[u8]) -> Poll<usize, io::Error> {
let mut inner = self.inner.borrow_mut();
let mut inner = self.inner.lock().unwrap();
assert!(inner.read.inner.vec.is_empty());
assert_eq!(inner.read.inner.pos, 0);
inner
@@ -364,20 +365,20 @@ impl DuplexHandle {
impl Drop for DuplexHandle {
fn drop(&mut self) {
trace!("mock duplex handle drop");
let mut inner = self.inner.borrow_mut();
let mut inner = self.inner.lock().unwrap();
inner.read.close();
inner.write.close();
}
}
pub struct MockConnector {
mocks: RefCell<HashMap<String, Vec<Duplex>>>,
mocks: Mutex<HashMap<String, Vec<Duplex>>>,
}
impl MockConnector {
pub fn new() -> MockConnector {
MockConnector {
mocks: RefCell::new(HashMap::new()),
mocks: Mutex::new(HashMap::new()),
}
}
@@ -392,7 +393,7 @@ impl MockConnector {
inner.read.park_tasks(true);
inner.write.park_tasks(true);
let inner = Rc::new(RefCell::new(inner));
let inner = Arc::new(Mutex::new(inner));
let duplex = Duplex {
inner: inner.clone(),
@@ -401,7 +402,7 @@ impl MockConnector {
inner: inner,
};
self.mocks.borrow_mut().entry(key)
self.mocks.lock().unwrap().entry(key)
.or_insert(Vec::new())
.push(duplex);
@@ -422,7 +423,7 @@ impl Connect for MockConnector {
} else {
"".to_owned()
});
let mut mocks = self.mocks.borrow_mut();
let mut mocks = self.mocks.lock().unwrap();
let mocks = mocks.get_mut(&key)
.expect(&format!("unknown mocks uri: {}", key));
assert!(!mocks.is_empty(), "no additional mocks for {}", key);

View File

@@ -6,20 +6,21 @@
pub mod conn;
mod service;
use std::cell::RefCell;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::rc::{Rc, Weak};
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
use futures::task::{self, Task};
use futures::future::{self};
use futures::{Future, Stream, Poll, Async};
use futures_timer::Delay;
use http::{Request, Response};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::{Core, Handle, Timeout};
use tokio::spawn;
use tokio::reactor::Handle;
use tokio::net::TcpListener;
pub use tokio_service::{NewService, Service};
@@ -54,7 +55,7 @@ where
{
protocol: Http<B::Data>,
new_service: S,
reactor: Core,
handle: Handle,
listener: TcpListener,
shutdown_timeout: Duration,
}
@@ -81,14 +82,25 @@ pub struct SpawnAll<I, S, E> {
/// A stream of connections from binding to an address.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct AddrIncoming {
addr: SocketAddr,
keep_alive_timeout: Option<Duration>,
listener: TcpListener,
handle: Handle,
sleep_on_errors: bool,
timeout: Option<Timeout>,
timeout: Option<Delay>,
}
impl fmt::Debug for AddrIncoming {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("AddrIncoming")
.field("addr", &self.addr)
.field("keep_alive_timeout", &self.keep_alive_timeout)
.field("listener", &self.listener)
.field("handle", &self.handle)
.field("sleep_on_errors", &self.sleep_on_errors)
.finish()
}
}
// ===== impl Http =====
@@ -156,19 +168,39 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
where S: NewService<Request = Request<Body>, Response = Response<Bd>, Error = ::Error> + 'static,
Bd: Entity<Data=B, Error=::Error>,
{
let core = try!(Core::new());
let handle = core.handle();
let listener = try!(TcpListener::bind(addr, &handle));
let handle = Handle::current();
let std_listener = StdTcpListener::bind(addr)?;
let listener = try!(TcpListener::from_std(std_listener, &handle));
Ok(Server {
new_service: new_service,
reactor: core,
handle: handle,
listener: listener,
protocol: self.clone(),
shutdown_timeout: Duration::new(1, 0),
})
}
/// Bind the provided `addr` and return a server with the default `Handle`.
///
/// This is method will bind the `addr` provided with a new TCP listener ready
/// to accept connections. Each connection will be processed with the
/// `new_service` object provided as well, creating a new service per
/// connection.
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
where S: NewService<Request = Request<Body>, Response = Response<Bd>, Error = ::Error>,
Bd: Entity<Data=B, Error=::Error>,
{
let handle = Handle::current();
let std_listener = StdTcpListener::bind(addr)?;
let listener = TcpListener::from_std(std_listener, &handle)?;
let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors)?;
if self.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
Ok(self.serve_incoming(incoming, new_service))
}
/// Bind the provided `addr` and return a server with a shared `Core`.
///
/// This method allows the ability to share a `Core` with multiple servers.
@@ -181,7 +213,8 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
where S: NewService<Request = Request<Body>, Response = Response<Bd>, Error = ::Error>,
Bd: Entity<Data=B, Error=::Error>,
{
let listener = TcpListener::bind(addr, &handle)?;
let std_listener = StdTcpListener::bind(addr)?;
let listener = TcpListener::from_std(std_listener, &handle)?;
let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors)?;
if self.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
@@ -221,17 +254,18 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
/// ```
/// # extern crate futures;
/// # extern crate hyper;
/// # extern crate tokio_core;
/// # extern crate tokio;
/// # extern crate tokio_io;
/// # use futures::Future;
/// # use hyper::{Body, Request, Response};
/// # use hyper::server::{Http, Service};
/// # use tokio_io::{AsyncRead, AsyncWrite};
/// # use tokio_core::reactor::Handle;
/// # fn run<I, S>(some_io: I, some_service: S, some_handle: &Handle)
/// # use tokio::reactor::Handle;
/// # fn run<I, S>(some_io: I, some_service: S)
/// # where
/// # I: AsyncRead + AsyncWrite + 'static,
/// # S: Service<Request=Request<Body>, Response=Response<Body>, Error=hyper::Error> + 'static,
/// # I: AsyncRead + AsyncWrite + Send + 'static,
/// # S: Service<Request=Request<Body>, Response=Response<Body>, Error=hyper::Error> + Send + 'static,
/// # S::Future: Send
/// # {
/// let http = Http::<hyper::Chunk>::new();
/// let conn = http.serve_connection(some_io, some_service);
@@ -240,7 +274,7 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
/// .map(|_| ())
/// .map_err(|e| eprintln!("server connection error: {}", e));
///
/// some_handle.spawn(fut);
/// tokio::spawn(fut);
/// # }
/// # fn main() {}
/// ```
@@ -286,21 +320,38 @@ impl<B> fmt::Debug for Http<B> {
// ===== impl Server =====
/// TODO: add docs
pub struct Run(Box<Future<Item=(), Error=::Error> + Send + 'static>);
impl fmt::Debug for Run {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Run").finish()
}
}
impl Future for Run {
type Item = ();
type Error = ::Error;
fn poll(&mut self) -> Poll<(), ::Error> {
self.0.poll()
}
}
impl<S, B> Server<S, B>
where S: NewService<Request = Request<Body>, Response = Response<B>, Error = ::Error> + 'static,
B: Entity<Error=::Error> + 'static,
where S: NewService<Request = Request<Body>, Response = Response<B>, Error = ::Error> + Send + 'static,
<S as NewService>::Instance: Send,
<<S as NewService>::Instance as Service>::Future: Send,
B: Entity<Error=::Error> + Send + 'static,
B::Data: Send,
{
/// Returns the local address that this server is bound to.
pub fn local_addr(&self) -> ::Result<SocketAddr> {
Ok(try!(self.listener.local_addr()))
}
/// Returns a handle to the underlying event loop that this server will be
/// running on.
pub fn handle(&self) -> Handle {
self.reactor.handle()
}
/// Configure the amount of time this server will wait for a "graceful
/// shutdown".
///
@@ -318,7 +369,7 @@ impl<S, B> Server<S, B>
///
/// This method does not currently return, but it will return an error if
/// one occurs.
pub fn run(self) -> ::Result<()> {
pub fn run(self) -> Run {
self.run_until(future::empty())
}
@@ -335,40 +386,42 @@ impl<S, B> Server<S, B>
/// `shutdown_timeout` time waiting for active connections to shut down.
/// Once the `shutdown_timeout` elapses or all active connections are
/// cleaned out then this method will return.
pub fn run_until<F>(self, shutdown_signal: F) -> ::Result<()>
where F: Future<Item = (), Error = ()>,
pub fn run_until<F>(self, shutdown_signal: F) -> Run
where F: Future<Item = (), Error = ()> + Send + 'static,
{
let Server { protocol, new_service, mut reactor, listener, shutdown_timeout } = self;
let Server { protocol, new_service, handle, listener, shutdown_timeout } = self;
let handle = reactor.handle();
let mut incoming = AddrIncoming::new(listener, handle.clone(), protocol.sleep_on_errors)?;
let mut incoming = match AddrIncoming::new(listener, handle.clone(), protocol.sleep_on_errors) {
Ok(incoming) => incoming,
Err(err) => return Run(Box::new(future::err(err.into()))),
};
if protocol.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
// Mini future to track the number of active services
let info = Rc::new(RefCell::new(Info {
let info = Arc::new(Mutex::new(Info {
active: 0,
blocker: None,
}));
// Future for our server's execution
let srv = incoming.for_each(|socket| {
let info_cloned = info.clone();
let srv = incoming.for_each(move |socket| {
let addr = socket.remote_addr;
debug!("accepted new connection ({})", addr);
let service = new_service.new_service()?;
let s = NotifyService {
inner: service,
info: Rc::downgrade(&info),
info: Arc::downgrade(&info_cloned),
};
info.borrow_mut().active += 1;
info_cloned.lock().unwrap().active += 1;
let fut = protocol.serve_connection(socket, s)
.map(|_| ())
.map_err(move |err| error!("server connection error: ({}) {}", addr, err));
handle.spawn(fut);
spawn(fut);
Ok(())
});
@@ -383,24 +436,30 @@ impl<S, B> Server<S, B>
//
// When we get a shutdown signal (`Ok`) then we drop the TCP listener to
// stop accepting incoming connections.
match reactor.run(shutdown_signal.select(srv)) {
Ok(((), _incoming)) => {}
Err((e, _other)) => return Err(e.into()),
}
let main_execution = shutdown_signal.select(srv).then(move |result| {
match result {
Ok(((), _incoming)) => {},
Err((e, _other)) => return future::Either::A(future::err(e.into()))
}
// Ok we've stopped accepting new connections at this point, but we want
// to give existing connections a chance to clear themselves out. Wait
// at most `shutdown_timeout` time before we just return clearing
// everything out.
//
// Our custom `WaitUntilZero` will resolve once all services constructed
// here have been destroyed.
let timeout = try!(Timeout::new(shutdown_timeout, &handle));
let wait = WaitUntilZero { info: info.clone() };
match reactor.run(wait.select(timeout)) {
Ok(_) => Ok(()),
Err((e, _)) => Err(e.into())
}
// Ok we've stopped accepting new connections at this point, but we want
// to give existing connections a chance to clear themselves out. Wait
// at most `shutdown_timeout` time before we just return clearing
// everything out.
//
// Our custom `WaitUntilZero` will resolve once all services constructed
// here have been destroyed.
let timeout = Delay::new(shutdown_timeout);
let wait = WaitUntilZero { info: info.clone() };
future::Either::B(wait.select(timeout).then(|result| {
match result {
Ok(_) => Ok(()),
Err((e, _)) => Err(e.into())
}
}))
});
Run(Box::new(main_execution))
}
}
@@ -537,8 +596,8 @@ impl Stream for AddrIncoming {
}
self.timeout = None;
loop {
match self.listener.accept() {
Ok((socket, addr)) => {
match self.listener.poll_accept() {
Ok(Async::Ready((socket, addr))) => {
if let Some(dur) = self.keep_alive_timeout {
if let Err(e) = socket.set_keepalive(Some(dur)) {
trace!("error trying to set TCP keepalive: {}", e);
@@ -546,7 +605,7 @@ impl Stream for AddrIncoming {
}
return Ok(Async::Ready(Some(AddrStream::new(socket, addr))));
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(ref e) if self.sleep_on_errors => {
// Connection errors can be ignored directly, continue by
// accepting the next request.
@@ -557,8 +616,7 @@ impl Stream for AddrIncoming {
let delay = ::std::time::Duration::from_millis(10);
debug!("accept error: {}; sleeping {:?}",
e, delay);
let mut timeout = Timeout::new(delay, &self.handle)
.expect("can always set a timeout");
let mut timeout = Delay::new(delay);
let result = timeout.poll()
.expect("timeout never fails");
match result {
@@ -660,11 +718,11 @@ mod addr_stream {
struct NotifyService<S> {
inner: S,
info: Weak<RefCell<Info>>,
info: Weak<Mutex<Info>>,
}
struct WaitUntilZero {
info: Rc<RefCell<Info>>,
info: Arc<Mutex<Info>>,
}
struct Info {
@@ -689,7 +747,7 @@ impl<S> Drop for NotifyService<S> {
Some(info) => info,
None => return,
};
let mut info = info.borrow_mut();
let mut info = info.lock().unwrap();
info.active -= 1;
if info.active == 0 {
if let Some(task) = info.blocker.take() {
@@ -704,7 +762,7 @@ impl Future for WaitUntilZero {
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
let mut info = self.info.borrow_mut();
let mut info = self.info.lock().unwrap();
if info.active == 0 {
Ok(().into())
} else {