From b1a90eb402f575244799ec357936bcf56728d722 Mon Sep 17 00:00:00 2001 From: Constantin Nickel Date: Thu, 12 Sep 2019 14:45:37 +0200 Subject: [PATCH] Prune the futures dependencies --- Cargo.toml | 4 +++- src/async_impl/body.rs | 10 +++++----- src/async_impl/decoder.rs | 22 ++++++++++++---------- src/async_impl/multipart.rs | 21 +++++++++++++-------- src/async_impl/request.rs | 2 +- src/async_impl/response.rs | 2 +- src/blocking/client.rs | 10 +++++----- src/blocking/response.rs | 8 ++++---- src/blocking/wait.rs | 6 +++--- src/connect.rs | 2 +- src/dns.rs | 3 ++- tests/client.rs | 9 ++++----- 12 files changed, 54 insertions(+), 45 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d0fc01f..d773e6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,9 @@ all-features = true base64 = "0.10" bytes = "0.4" encoding_rs = "0.8" -futures-preview = { version = "=0.3.0-alpha.18" } +futures-core-preview = { version = "=0.3.0-alpha.18" } +futures-channel-preview = { version = "=0.3.0-alpha.18" } +futures-util-preview = { version = "=0.3.0-alpha.18" } http = "0.1.15" hyper = "=0.13.0-alpha.1" log = "0.4" diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index faa182f..c89c790 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use futures::Stream; +use futures_core::Stream; use hyper::body::Payload; use std::fmt; use std::future::Future; @@ -30,7 +30,7 @@ impl Body { /// /// ``` /// # use reqwest::Body; - /// # use futures; + /// # use futures_util; /// # fn main() { /// let chunks: Vec> = vec![ /// Ok("hello"), @@ -38,14 +38,14 @@ impl Body { /// Ok("world"), /// ]; /// - /// let stream = futures::stream::iter(chunks); + /// let stream = futures_util::stream::iter(chunks); /// /// let body = Body::wrap_stream(stream); /// # } /// ``` pub fn wrap_stream(stream: S) -> Body where - S: futures::TryStream + Send + Sync + 'static, + S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, hyper::Chunk: From, { @@ -157,7 +157,7 @@ impl Stream for ImplStream { return Poll::Ready(Some(Err(crate::error::timedout(None)))); } } - futures::ready!(Pin::new(body).poll_data(cx)) + futures_core::ready!(Pin::new(body).poll_data(cx)) .map(|opt_chunk| opt_chunk.map(Into::into).map_err(crate::error::from)) } Inner::Reusable(ref mut bytes) => { diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs index b1eb814..ece367e 100644 --- a/src/async_impl/decoder.rs +++ b/src/async_impl/decoder.rs @@ -19,8 +19,8 @@ use std::task::{Context, Poll}; use async_compression::stream::GzipDecoder; use bytes::Bytes; -use futures::stream::Peekable; -use futures::Stream; +use futures_core::Stream; +use futures_util::stream::Peekable; use hyper::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; use hyper::HeaderMap; @@ -80,7 +80,7 @@ impl Decoder { /// /// This decoder will buffer and decompress chunks that are gzipped. fn gzip(body: Body) -> Decoder { - use futures::stream::StreamExt; + use futures_util::StreamExt; Decoder { inner: Inner::Pending(Pending(IoStream(body.into_stream()).peekable())), @@ -142,7 +142,7 @@ impl Stream for Decoder { }, Inner::PlainText(ref mut body) => return Pin::new(body).poll_next(cx), Inner::Gzip(ref mut decoder) => { - return match futures::ready!(Pin::new(decoder).poll_next(cx)) { + return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), Some(Err(err)) => Poll::Ready(Some(Err(crate::error::from_io(err)))), None => Poll::Ready(None), @@ -159,17 +159,19 @@ impl Future for Pending { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures::stream::StreamExt; + use futures_util::StreamExt; - match futures::ready!(Pin::new(&mut self.0).peek(cx)) { + match futures_core::ready!(Pin::new(&mut self.0).peek(cx)) { Some(Ok(_)) => { // fallthrough } Some(Err(_e)) => { // error was just a ref, so we need to really poll to move it - return Poll::Ready(Err(futures::ready!(Pin::new(&mut self.0).poll_next(cx)) - .expect("just peeked Some") - .unwrap_err())); + return Poll::Ready(Err(futures_core::ready!( + Pin::new(&mut self.0).poll_next(cx) + ) + .expect("just peeked Some") + .unwrap_err())); } None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), }; @@ -186,7 +188,7 @@ impl Stream for IoStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match futures::ready!(Pin::new(&mut self.0).poll_next(cx)) { + match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) { Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))), Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))), None => Poll::Ready(None), diff --git a/src/async_impl/multipart.rs b/src/async_impl/multipart.rs index b360035..1294385 100644 --- a/src/async_impl/multipart.rs +++ b/src/async_impl/multipart.rs @@ -7,7 +7,7 @@ use mime_guess::Mime; use percent_encoding::{self, AsciiSet, NON_ALPHANUMERIC}; use uuid::Uuid; -use futures::StreamExt; +use futures_util::StreamExt; use super::Body; @@ -473,7 +473,8 @@ impl PercentEncoding { #[cfg(test)] mod tests { use super::*; - use futures::TryStreamExt; + use futures_util::TryStreamExt; + use futures_util::{future, stream}; use tokio; #[test] @@ -493,17 +494,21 @@ mod tests { let mut form = Form::new() .part( "reader1", - Part::stream(Body::wrap_stream(futures::stream::once( - futures::future::ready::>(Ok("part1".to_owned())), - ))), + Part::stream(Body::wrap_stream(stream::once(future::ready::< + Result, + >(Ok( + "part1".to_owned(), + ))))), ) .part("key1", Part::text("value1")) .part("key2", Part::text("value2").mime(mime::IMAGE_BMP)) .part( "reader2", - Part::stream(Body::wrap_stream(futures::stream::once( - futures::future::ready::>(Ok("part2".to_owned())), - ))), + Part::stream(Body::wrap_stream(stream::once(future::ready::< + Result, + >(Ok( + "part2".to_owned(), + ))))), ) .part("key3", Part::text("value3").file_name("filename")); form.inner.boundary = "boundary".to_string(); diff --git a/src/async_impl/request.rs b/src/async_impl/request.rs index 18f9f48..e81418c 100644 --- a/src/async_impl/request.rs +++ b/src/async_impl/request.rs @@ -1,7 +1,7 @@ use std::fmt; +use std::future::Future; use base64::encode; -use futures::Future; use serde::Serialize; use serde_json; use serde_urlencoded; diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index f7d285f..f9efc06 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -4,7 +4,7 @@ use std::net::SocketAddr; use bytes::Bytes; use encoding_rs::{Encoding, UTF_8}; -use futures::{StreamExt, TryStreamExt}; +use futures_util::{StreamExt, TryStreamExt}; use http; use hyper::client::connect::HttpInfo; use hyper::header::CONTENT_LENGTH; diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 59dc3b5..1edcc51 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -5,8 +5,8 @@ use std::sync::Arc; use std::thread; use std::time::Duration; -use futures::channel::{mpsc, oneshot}; -use futures::{StreamExt, TryFutureExt}; +use futures_channel::{mpsc, oneshot}; +use futures_util::{StreamExt, TryFutureExt}; use log::{error, trace}; @@ -652,15 +652,15 @@ where { use std::task::Poll; - futures::pin_mut!(fut); + futures_util::pin_mut!(fut); // "select" on the sender being canceled, and the future completing - let res = futures::future::poll_fn(|cx| { + let res = futures_util::future::poll_fn(|cx| { match fut.as_mut().poll(cx) { Poll::Ready(val) => Poll::Ready(Some(val)), Poll::Pending => { // check if the callback is canceled - futures::ready!(tx.poll_cancel(cx)); + futures_core::ready!(tx.poll_cancel(cx)); Poll::Ready(None) } } diff --git a/src/blocking/response.rs b/src/blocking/response.rs index 49a5b22..cefc15a 100644 --- a/src/blocking/response.rs +++ b/src/blocking/response.rs @@ -17,7 +17,7 @@ use crate::{async_impl, StatusCode, Url, Version}; /// A Response to a submitted `Request`. pub struct Response { inner: async_impl::Response, - body: Option>>, + body: Option>>, timeout: Option, _thread_handle: KeepCoreThreadAlive, } @@ -340,8 +340,8 @@ impl Response { // private - fn body_mut(&mut self) -> Pin<&mut dyn futures::io::AsyncRead> { - use futures::stream::TryStreamExt; + fn body_mut(&mut self) -> Pin<&mut dyn futures_util::io::AsyncRead> { + use futures_util::TryStreamExt; if self.body.is_none() { let body = mem::replace(self.inner.body_mut(), async_impl::Decoder::empty()); @@ -355,7 +355,7 @@ impl Response { impl Read for Response { fn read(&mut self, buf: &mut [u8]) -> io::Result { - use futures::io::AsyncReadExt; + use futures_util::io::AsyncReadExt; let timeout = self.timeout; wait::timeout(self.body_mut().read(buf), timeout).map_err(|e| match e { diff --git a/src/blocking/wait.rs b/src/blocking/wait.rs index e96c904..bd00dc1 100644 --- a/src/blocking/wait.rs +++ b/src/blocking/wait.rs @@ -23,10 +23,10 @@ where let mut park = ParkThread::new(); // Arc shouldn't be necessary, since UnparkThread is reference counted internally, // but let's just stay safe for now. - let waker = futures::task::waker(Arc::new(UnparkWaker(park.unpark()))); + let waker = futures_util::task::waker(Arc::new(UnparkWaker(park.unpark()))); let mut cx = Context::from_waker(&waker); - futures::pin_mut!(fut); + futures_util::pin_mut!(fut); loop { match fut.as_mut().poll(&mut cx) { @@ -60,7 +60,7 @@ pub(crate) enum Waited { struct UnparkWaker(UnparkThread); -impl futures::task::ArcWake for UnparkWaker { +impl futures_util::task::ArcWake for UnparkWaker { fn wake_by_ref(arc_self: &Arc) { arc_self.0.unpark(); } diff --git a/src/connect.rs b/src/connect.rs index 88c8875..022db1d 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -1,4 +1,4 @@ -use futures::FutureExt; +use futures_util::FutureExt; use http::uri::Scheme; use hyper::client::connect::{Connect, Connected, Destination}; use tokio::io::{AsyncRead, AsyncWrite}; diff --git a/src/dns.rs b/src/dns.rs index 9d03eee..a76749d 100644 --- a/src/dns.rs +++ b/src/dns.rs @@ -1,8 +1,9 @@ +use std::future::Future; use std::net::IpAddr; use std::sync::{Arc, Mutex, Once}; use std::{io, vec}; -use futures::{future, Future}; +use futures_util::future; use hyper::client::connect::dns as hyper_dns; use tokio; use trust_dns_resolver::{system_conf, AsyncResolver, BackgroundLookupIp}; diff --git a/tests/client.rs b/tests/client.rs index aaa1621..e541f9e 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -85,12 +85,11 @@ async fn response_json() { #[tokio::test] async fn multipart() { + use futures_util::{future, stream}; + let _ = env_logger::try_init(); - let stream = reqwest::Body::wrap_stream(futures::stream::once(futures::future::ready(Ok::< - _, - reqwest::Error, - >( + let stream = reqwest::Body::wrap_stream(stream::once(future::ready(Ok::<_, reqwest::Error>( "part1 part2".to_owned(), )))); let part = Part::stream(stream); @@ -284,7 +283,7 @@ async fn gzip_case(response_size: usize, chunk_size: usize) { async fn body_stream() { let _ = env_logger::try_init(); - let source = futures::stream::iter::>>(vec![ + let source = futures_util::stream::iter::>>(vec![ Ok(Bytes::from_static(b"123")), Ok(Bytes::from_static(b"4567")), ]);