refactor(server): use AddrIncoming in Server::run

This commit is contained in:
Sean McArthur
2018-02-01 12:24:35 -08:00
parent b14de9eb34
commit 1aa3835660

View File

@@ -364,6 +364,11 @@ impl<S, B> Server<S, B>
let handle = reactor.handle(); let handle = reactor.handle();
let incoming = AddrIncoming {
addr: listener.local_addr()?,
listener: listener,
};
// Mini future to track the number of active services // Mini future to track the number of active services
let info = Rc::new(RefCell::new(Info { let info = Rc::new(RefCell::new(Info {
active: 0, active: 0,
@@ -371,7 +376,8 @@ impl<S, B> Server<S, B>
})); }));
// Future for our server's execution // Future for our server's execution
let srv = listener.incoming().for_each(|(socket, addr)| { let srv = incoming.for_each(|socket| {
let addr = socket.remote_addr;
let addr_service = SocketAddrService::new(addr, new_service.new_service()?); let addr_service = SocketAddrService::new(addr, new_service.new_service()?);
let s = NotifyService { let s = NotifyService {
inner: addr_service, inner: addr_service,
@@ -587,17 +593,18 @@ impl AddrIncoming {
} }
impl Stream for AddrIncoming { impl Stream for AddrIncoming {
// currently unnameable...
type Item = self::addr_stream::AddrStream; type Item = self::addr_stream::AddrStream;
type Error = ::std::io::Error; type Error = ::std::io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop { loop {
match self.listener.accept() { match self.listener.accept() {
Ok((socket, _addr)) => { Ok((socket, addr)) => {
return Ok(Async::Ready(Some(self::addr_stream::new(socket)))); return Ok(Async::Ready(Some(self::addr_stream::new(socket, addr))));
}, },
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
Err(e) => debug!("internal error: {:?}", e), Err(e) => return Err(e),
} }
} }
} }
@@ -605,20 +612,23 @@ impl Stream for AddrIncoming {
mod addr_stream { mod addr_stream {
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::net::SocketAddr;
use bytes::{Buf, BufMut}; use bytes::{Buf, BufMut};
use futures::Poll; use futures::Poll;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
pub fn new(tcp: TcpStream) -> AddrStream { pub fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream {
AddrStream { AddrStream {
inner: tcp, inner: tcp,
remote_addr: addr,
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct AddrStream { pub struct AddrStream {
inner: TcpStream, inner: TcpStream,
pub(super) remote_addr: SocketAddr,
} }
impl Read for AddrStream { impl Read for AddrStream {