use std::fmt; use std::io; use std::net::{SocketAddr, TcpListener as StdTcpListener}; use std::time::{Duration, Instant}; use futures::{Async, Future, Poll, Stream}; use tokio_reactor::Handle; use tokio_tcp::TcpListener; use tokio_timer::Delay; pub use self::addr_stream::AddrStream; /// A stream of connections from binding to an address. #[must_use = "streams do nothing unless polled"] pub struct AddrIncoming { addr: SocketAddr, listener: TcpListener, sleep_on_errors: bool, tcp_keepalive_timeout: Option, tcp_nodelay: bool, timeout: Option, } impl AddrIncoming { pub(super) fn new(addr: &SocketAddr, handle: Option<&Handle>) -> ::Result { let std_listener = StdTcpListener::bind(addr) .map_err(::Error::new_listen)?; if let Some(handle) = handle { AddrIncoming::from_std(std_listener, handle) } else { let handle = Handle::default(); AddrIncoming::from_std(std_listener, &handle) } } pub(super) fn from_std(std_listener: StdTcpListener, handle: &Handle) -> ::Result { let listener = TcpListener::from_std(std_listener, &handle) .map_err(::Error::new_listen)?; let addr = listener.local_addr().map_err(::Error::new_listen)?; Ok(AddrIncoming { listener, addr: addr, sleep_on_errors: true, tcp_keepalive_timeout: None, tcp_nodelay: false, timeout: None, }) } /// Creates a new `AddrIncoming` binding to provided socket address. pub fn bind(addr: &SocketAddr) -> ::Result { AddrIncoming::new(addr, None) } /// Get the local address bound to this listener. pub fn local_addr(&self) -> SocketAddr { self.addr } /// Set whether TCP keepalive messages are enabled on accepted connections. /// /// If `None` is specified, keepalive is disabled, otherwise the duration /// specified will be the time to remain idle before sending TCP keepalive /// probes. pub fn set_keepalive(&mut self, keepalive: Option) -> &mut Self { self.tcp_keepalive_timeout = keepalive; self } /// Set the value of `TCP_NODELAY` option for accepted connections. pub fn set_nodelay(&mut self, enabled: bool) -> &mut Self { self.tcp_nodelay = enabled; self } /// Set whether to sleep on accept errors. /// /// A possible scenario is that the process has hit the max open files /// allowed, and so trying to accept a new connection will fail with /// `EMFILE`. In some cases, it's preferable to just wait for some time, if /// the application will likely close some files (or connections), and try /// to accept the connection again. If this option is `true`, the error /// will be logged at the `error` level, since it is still a big deal, /// and then the listener will sleep for 1 second. /// /// In other cases, hitting the max open files should be treat similarly /// to being out-of-memory, and simply error (and shutdown). Setting /// this option to `false` will allow that. /// /// Default is `true`. pub fn set_sleep_on_errors(&mut self, val: bool) { self.sleep_on_errors = val; } } impl Stream for AddrIncoming { // currently unnameable... type Item = AddrStream; type Error = ::std::io::Error; fn poll(&mut self) -> Poll, Self::Error> { // Check if a previous timeout is active that was set by IO errors. if let Some(ref mut to) = self.timeout { match to.poll() { Ok(Async::Ready(())) => {} Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { error!("sleep timer error: {}", err); } } } self.timeout = None; loop { match self.listener.poll_accept() { Ok(Async::Ready((socket, addr))) => { if let Some(dur) = self.tcp_keepalive_timeout { if let Err(e) = socket.set_keepalive(Some(dur)) { trace!("error trying to set TCP keepalive: {}", e); } } if let Err(e) = socket.set_nodelay(self.tcp_nodelay) { trace!("error trying to set TCP nodelay: {}", e); } return Ok(Async::Ready(Some(AddrStream::new(socket, addr)))); }, Ok(Async::NotReady) => return Ok(Async::NotReady), Err(e) => { // Connection errors can be ignored directly, continue by // accepting the next request. if is_connection_error(&e) { debug!("accepted connection already errored: {}", e); continue; } if self.sleep_on_errors { // Sleep 1s. let delay = Instant::now() + Duration::from_secs(1); let mut timeout = Delay::new(delay); match timeout.poll() { Ok(Async::Ready(())) => { // Wow, it's been a second already? Ok then... error!("accept error: {}", e); continue }, Ok(Async::NotReady) => { error!("accept error: {}", e); self.timeout = Some(timeout); return Ok(Async::NotReady); }, Err(timer_err) => { error!("couldn't sleep on error, timer error: {}", timer_err); return Err(e); } } } else { return Err(e); } }, } } } } /// This function defines errors that are per-connection. Which basically /// means that if we get this error from `accept()` system call it means /// next connection might be ready to be accepted. /// /// All other errors will incur a timeout before next `accept()` is performed. /// The timeout is useful to handle resource exhaustion errors like ENFILE /// and EMFILE. Otherwise, could enter into tight loop. fn is_connection_error(e: &io::Error) -> bool { match e.kind() { io::ErrorKind::ConnectionRefused | io::ErrorKind::ConnectionAborted | io::ErrorKind::ConnectionReset => true, _ => false, } } impl fmt::Debug for AddrIncoming { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("AddrIncoming") .field("addr", &self.addr) .field("sleep_on_errors", &self.sleep_on_errors) .field("tcp_keepalive_timeout", &self.tcp_keepalive_timeout) .field("tcp_nodelay", &self.tcp_nodelay) .finish() } } mod addr_stream { use std::io::{self, Read, Write}; use std::net::SocketAddr; use bytes::{Buf, BufMut}; use futures::Poll; use tokio_tcp::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; /// A transport returned yieled by `AddrIncoming`. #[derive(Debug)] pub struct AddrStream { inner: TcpStream, pub(super) remote_addr: SocketAddr, } impl AddrStream { pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream { AddrStream { inner: tcp, remote_addr: addr, } } /// Returns the remote (peer) address of this connection. #[inline] pub fn remote_addr(&self) -> SocketAddr { self.remote_addr } } impl Read for AddrStream { #[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } } impl Write for AddrStream { #[inline] fn write(&mut self, buf: &[u8]) -> io::Result { self.inner.write(buf) } #[inline] fn flush(&mut self) -> io::Result<()> { // TcpStream::flush is a noop, so skip calling it... Ok(()) } } impl AsyncRead for AddrStream { #[inline] unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } #[inline] fn read_buf(&mut self, buf: &mut B) -> Poll { self.inner.read_buf(buf) } } impl AsyncWrite for AddrStream { #[inline] fn shutdown(&mut self) -> Poll<(), io::Error> { AsyncWrite::shutdown(&mut self.inner) } #[inline] fn write_buf(&mut self, buf: &mut B) -> Poll { self.inner.write_buf(buf) } } }