Replace futures-channel with tokio::sync in blocking client (#748)
This commit is contained in:
@@ -28,7 +28,7 @@ native-tls-vendored = ["native-tls", "native-tls-crate/vendored"]
|
|||||||
|
|
||||||
rustls-tls = ["hyper-rustls", "tokio-rustls", "webpki-roots", "rustls", "__tls"]
|
rustls-tls = ["hyper-rustls", "tokio-rustls", "webpki-roots", "rustls", "__tls"]
|
||||||
|
|
||||||
blocking = ["futures-channel", "futures-util/io", "tokio/rt-threaded", "tokio/rt-core"]
|
blocking = ["futures-util/io", "tokio/rt-threaded", "tokio/rt-core", "tokio/sync"]
|
||||||
|
|
||||||
cookies = ["cookie_crate", "cookie_store"]
|
cookies = ["cookie_crate", "cookie_store"]
|
||||||
|
|
||||||
@@ -89,9 +89,6 @@ rustls = { version = "0.16", features = ["dangerous_configuration"], optional =
|
|||||||
tokio-rustls = { version = "0.12", optional = true }
|
tokio-rustls = { version = "0.12", optional = true }
|
||||||
webpki-roots = { version = "0.17", optional = true }
|
webpki-roots = { version = "0.17", optional = true }
|
||||||
|
|
||||||
## blocking
|
|
||||||
futures-channel = { version = "0.3.0", optional = true }
|
|
||||||
|
|
||||||
## cookies
|
## cookies
|
||||||
cookie_crate = { version = "0.12", package = "cookie", optional = true }
|
cookie_crate = { version = "0.12", package = "cookie", optional = true }
|
||||||
cookie_store = { version = "0.10", optional = true }
|
cookie_store = { version = "0.10", optional = true }
|
||||||
|
|||||||
@@ -5,10 +5,8 @@ use std::sync::Arc;
|
|||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use futures_channel::{mpsc, oneshot};
|
|
||||||
use futures_util::{StreamExt, TryFutureExt};
|
|
||||||
|
|
||||||
use log::{error, trace};
|
use log::{error, trace};
|
||||||
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
use super::request::{Request, RequestBuilder};
|
use super::request::{Request, RequestBuilder};
|
||||||
use super::response::Response;
|
use super::response::Response;
|
||||||
@@ -562,7 +560,7 @@ impl ClientHandle {
|
|||||||
fn new(builder: ClientBuilder) -> crate::Result<ClientHandle> {
|
fn new(builder: ClientBuilder) -> crate::Result<ClientHandle> {
|
||||||
let timeout = builder.timeout;
|
let timeout = builder.timeout;
|
||||||
let builder = builder.inner;
|
let builder = builder.inner;
|
||||||
let (tx, rx) = mpsc::unbounded::<(async_impl::Request, OneshotResponse)>();
|
let (tx, rx) = mpsc::unbounded_channel::<(async_impl::Request, OneshotResponse)>();
|
||||||
let (spawn_tx, spawn_rx) = oneshot::channel::<crate::Result<()>>();
|
let (spawn_tx, spawn_rx) = oneshot::channel::<crate::Result<()>>();
|
||||||
let handle = thread::Builder::new()
|
let handle = thread::Builder::new()
|
||||||
.name("reqwest-internal-sync-runtime".into())
|
.name("reqwest-internal-sync-runtime".into())
|
||||||
@@ -595,7 +593,7 @@ impl ClientHandle {
|
|||||||
|
|
||||||
let mut rx = rx;
|
let mut rx = rx;
|
||||||
|
|
||||||
while let Some((req, req_tx)) = rx.next().await {
|
while let Some((req, req_tx)) = rx.recv().await {
|
||||||
let req_fut = client.execute(req);
|
let req_fut = client.execute(req);
|
||||||
tokio::spawn(forward(req_fut, req_tx));
|
tokio::spawn(forward(req_fut, req_tx));
|
||||||
}
|
}
|
||||||
@@ -633,7 +631,7 @@ impl ClientHandle {
|
|||||||
.tx
|
.tx
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("core thread exited early")
|
.expect("core thread exited early")
|
||||||
.unbounded_send((req, tx))
|
.send((req, tx))
|
||||||
.expect("core thread panicked");
|
.expect("core thread panicked");
|
||||||
|
|
||||||
let result: Result<crate::Result<async_impl::Response>, wait::Waited<crate::Error>> =
|
let result: Result<crate::Result<async_impl::Response>, wait::Waited<crate::Error>> =
|
||||||
@@ -644,10 +642,10 @@ impl ClientHandle {
|
|||||||
};
|
};
|
||||||
wait::timeout(f, self.timeout.0)
|
wait::timeout(f, self.timeout.0)
|
||||||
} else {
|
} else {
|
||||||
wait::timeout(
|
let f = async move {
|
||||||
rx.map_err(|_canceled| event_loop_panicked()),
|
rx.await.map_err(|_canceled| event_loop_panicked())
|
||||||
self.timeout.0,
|
};
|
||||||
)
|
wait::timeout(f, self.timeout.0)
|
||||||
};
|
};
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
@@ -677,7 +675,7 @@ where
|
|||||||
Poll::Ready(val) => Poll::Ready(Some(val)),
|
Poll::Ready(val) => Poll::Ready(Some(val)),
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
// check if the callback is canceled
|
// check if the callback is canceled
|
||||||
futures_core::ready!(tx.poll_canceled(cx));
|
futures_core::ready!(tx.poll_closed(cx));
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user