From f71304b44905cdc71f9e8a708156c72878b2f1aa Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 23 Oct 2019 14:29:15 -0700 Subject: [PATCH] refactor(client): use pin_project for Resolve futures --- src/client/connect/dns.rs | 6 ++--- src/client/connect/http.rs | 45 ++++++++++++++++++++------------------ 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/src/client/connect/dns.rs b/src/client/connect/dns.rs index 03e30f09..0160a268 100644 --- a/src/client/connect/dns.rs +++ b/src/client/connect/dns.rs @@ -17,14 +17,14 @@ use std::str::FromStr; use tokio_sync::{mpsc, oneshot}; -use crate::common::{Future, Never, Pin, Poll, Unpin, task}; +use crate::common::{Future, Never, Pin, Poll, task}; /// Resolve a hostname to a set of IP addresses. -pub trait Resolve: Unpin { +pub trait Resolve { /// The set of IP addresses to try to connect to. type Addrs: Iterator; /// A Future of the resolved set of addresses. - type Future: Future> + Unpin; + type Future: Future>; /// Resolve a hostname. fn resolve(&self, name: Name) -> Self::Future; } diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index b67f29a1..1ed5ef8c 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -9,6 +9,7 @@ use std::time::Duration; use http::uri::{Scheme, Uri}; use futures_util::{TryFutureExt, FutureExt}; use net2::TcpBuilder; +use pin_project::{pin_project, project}; use tokio_net::driver::Handle; use tokio_net::tcp::TcpStream; use tokio_timer::{Delay, Timeout}; @@ -359,7 +360,9 @@ impl StdError for InvalidUrl { } /// A Future representing work to connect to a URL. #[must_use = "futures do nothing unless polled"] +#[pin_project] pub struct HttpConnecting { + #[pin] state: State, handle: Option, connect_timeout: Option, @@ -372,61 +375,61 @@ pub struct HttpConnecting { recv_buffer_size: Option, } +#[pin_project] enum State { Lazy(R, String, Option), - Resolving(R::Future, Option), + Resolving(#[pin] R::Future, Option), Connecting(ConnectingTcp), Error(Option), } -impl Future for HttpConnecting -where - R::Future: Unpin, -{ +impl Future for HttpConnecting { type Output = Result<(TcpStream, Connected), io::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let me = &mut *self; + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let mut me = self.project(); loop { let state; - match me.state { - State::Lazy(ref resolver, ref mut host, local_addr) => { + #[project] + match me.state.as_mut().project() { + State::Lazy(ref resolver, ref mut host, ref local_addr) => { // If the host is already an IP addr (v4 or v6), // skip resolving the dns and start connecting right away. - if let Some(addrs) = dns::IpAddrs::try_parse(host, me.port) { + if let Some(addrs) = dns::IpAddrs::try_parse(host, *me.port) { state = State::Connecting(ConnectingTcp::new( - local_addr, addrs, me.connect_timeout, me.happy_eyeballs_timeout, me.reuse_address)); + **local_addr, addrs, *me.connect_timeout, *me.happy_eyeballs_timeout, *me.reuse_address)); } else { let name = dns::Name::new(mem::replace(host, String::new())); - state = State::Resolving(resolver.resolve(name), local_addr); + state = State::Resolving(resolver.resolve(name), **local_addr); } }, - State::Resolving(ref mut future, local_addr) => { - let addrs = ready!(Pin::new(future).poll(cx))?; - let port = me.port; + State::Resolving(future, local_addr) => { + let addrs = ready!(future.poll(cx))?; + let port = *me.port; let addrs = addrs .map(|addr| SocketAddr::new(addr, port)) .collect(); let addrs = dns::IpAddrs::new(addrs); state = State::Connecting(ConnectingTcp::new( - local_addr, addrs, me.connect_timeout, me.happy_eyeballs_timeout, me.reuse_address)); + *local_addr, addrs, *me.connect_timeout, *me.happy_eyeballs_timeout, *me.reuse_address)); }, State::Connecting(ref mut c) => { let sock = ready!(c.poll(cx, &me.handle))?; if let Some(dur) = me.keep_alive_timeout { - sock.set_keepalive(Some(dur))?; + sock.set_keepalive(Some(*dur))?; } if let Some(size) = me.send_buffer_size { - sock.set_send_buffer_size(size)?; + sock.set_send_buffer_size(*size)?; } if let Some(size) = me.recv_buffer_size { - sock.set_recv_buffer_size(size)?; + sock.set_recv_buffer_size(*size)?; } - sock.set_nodelay(me.nodelay)?; + sock.set_nodelay(*me.nodelay)?; let extra = HttpInfo { remote_addr: sock.peer_addr()?, @@ -438,7 +441,7 @@ where }, State::Error(ref mut e) => return Poll::Ready(Err(e.take().expect("polled more than once"))), } - me.state = state; + me.state.set(state); } } }