use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; use futures::{Async, Future, Poll, Stream}; use futures::executor::{self, Notify}; use tokio_executor::{enter, EnterError}; pub(crate) fn timeout(fut: F, timeout: Option) -> Result> where F: Future, { let mut spawn = executor::spawn(fut); block_on(timeout, |notify| { spawn.poll_future_notify(notify, 0) }) } pub(crate) fn stream(stream: S, timeout: Option) -> WaitStream where S: Stream { WaitStream { stream: executor::spawn(stream), timeout, } } #[derive(Debug)] pub(crate) enum Waited { TimedOut, Executor(EnterError), Inner(E), } impl From for Waited { fn from(err: E) -> Waited { Waited::Inner(err) } } pub(crate) struct WaitStream { stream: executor::Spawn, timeout: Option, } impl Iterator for WaitStream where S: Stream { type Item = Result>; fn next(&mut self) -> Option { let res = block_on(self.timeout, |notify| { self.stream.poll_stream_notify(notify, 0) }); match res { Ok(Some(val)) => Some(Ok(val)), Ok(None) => None, Err(err) => Some(Err(err)), } } } struct ThreadNotify { thread: thread::Thread, } impl Notify for ThreadNotify { fn notify(&self, _id: usize) { self.thread.unpark(); } } fn block_on(timeout: Option, mut poll: F) -> Result> where F: FnMut(&Arc) -> Poll, { let _entered = enter().map_err(Waited::Executor)?; let deadline = timeout.map(|d| { Instant::now() + d }); let notify = Arc::new(ThreadNotify { thread: thread::current(), }); loop { match poll(¬ify)? { Async::Ready(val) => return Ok(val), Async::NotReady => {} } if let Some(deadline) = deadline { let now = Instant::now(); if now >= deadline { return Err(Waited::TimedOut); } thread::park_timeout(deadline - now); } else { thread::park(); } } }