Update to hyper 0.13

This commit is contained in:
Gleb Pomykalov
2019-12-11 03:24:05 +03:00
committed by Sean McArthur
parent db2de90e42
commit 0f32c4a01a
23 changed files with 469 additions and 219 deletions

View File

@@ -2,6 +2,8 @@ use std::fmt;
use std::fs::File;
use std::future::Future;
use std::io::{self, Cursor, Read};
use std::mem::{self, MaybeUninit};
use std::ptr;
use bytes::Bytes;
@@ -246,9 +248,17 @@ async fn send_future(sender: Sender) -> Result<(), crate::Error> {
if buf.is_empty() {
if buf.remaining_mut() == 0 {
buf.reserve(8192);
// zero out the reserved memory
unsafe {
let uninit = mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut());
ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len());
}
}
match body.read(unsafe { buf.bytes_mut() }) {
let bytes = unsafe {
mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut())
};
match body.read(bytes) {
Ok(0) => {
// The buffer was empty and nothing's left to
// read. Return.
@@ -270,7 +280,7 @@ async fn send_future(sender: Sender) -> Result<(), crate::Error> {
let buf_len = buf.len() as u64;
tx.as_mut()
.expect("tx only taken on error")
.send_data(buf.take().freeze().into())
.send_data(buf.split().freeze())
.await
.map_err(crate::error::body)?;

View File

@@ -593,9 +593,8 @@ impl ClientHandle {
let handle = thread::Builder::new()
.name("reqwest-internal-sync-runtime".into())
.spawn(move || {
use tokio::runtime::current_thread::Runtime;
let mut rt = match Runtime::new().map_err(crate::error::builder) {
use tokio::runtime;
let mut rt = match runtime::Builder::new().basic_scheduler().enable_all().build().map_err(crate::error::builder) {
Err(e) => {
if let Err(e) = spawn_tx.send(Err(e)) {
error!("Failed to communicate runtime creation failure: {:?}", e);
@@ -685,7 +684,6 @@ impl ClientHandle {
KeepCoreThreadAlive(Some(self.inner.clone())),
)),
Err(wait::Waited::TimedOut(e)) => Err(crate::error::request(e).with_url(url)),
Err(wait::Waited::Executor(err)) => Err(crate::error::request(err).with_url(url)),
Err(wait::Waited::Inner(err)) => Err(err.with_url(url)),
}
}
@@ -705,7 +703,7 @@ where
Poll::Ready(val) => Poll::Ready(Some(val)),
Poll::Pending => {
// check if the callback is canceled
futures_core::ready!(tx.poll_cancel(cx));
futures_core::ready!(tx.poll_canceled(cx));
Poll::Ready(None)
}
}

View File

@@ -1,7 +1,7 @@
use std::fmt;
use std::convert::TryFrom;
use base64::encode;
use http::HttpTryFrom;
use serde::Serialize;
#[cfg(feature = "json")]
use serde_json;
@@ -140,13 +140,15 @@ impl RequestBuilder {
/// ```
pub fn header<K, V>(mut self, key: K, value: V) -> RequestBuilder
where
HeaderName: HttpTryFrom<K>,
HeaderValue: HttpTryFrom<V>,
HeaderName: TryFrom<K>,
HeaderValue: TryFrom<V>,
<HeaderName as TryFrom<K>>::Error: Into<http::Error>,
<HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
{
let mut error = None;
if let Ok(ref mut req) = self.request {
match <HeaderName as HttpTryFrom<K>>::try_from(key) {
Ok(key) => match <HeaderValue as HttpTryFrom<V>>::try_from(value) {
match <HeaderName as TryFrom<K>>::try_from(key) {
Ok(key) => match <HeaderValue as TryFrom<V>>::try_from(value) {
Ok(value) => {
req.headers_mut().append(key, value);
}

View File

@@ -220,7 +220,6 @@ impl Response {
pub fn json<T: DeserializeOwned>(self) -> crate::Result<T> {
wait::timeout(self.inner.json(), self.timeout).map_err(|e| match e {
wait::Waited::TimedOut(e) => crate::error::decode(e),
wait::Waited::Executor(e) => crate::error::decode(e),
wait::Waited::Inner(e) => e,
})
}
@@ -269,7 +268,6 @@ impl Response {
wait::timeout(self.inner.text_with_charset(default_encoding), self.timeout).map_err(|e| {
match e {
wait::Waited::TimedOut(e) => crate::error::decode(e),
wait::Waited::Executor(e) => crate::error::decode(e),
wait::Waited::Inner(e) => e,
}
})
@@ -375,7 +373,6 @@ impl Read for Response {
let timeout = self.timeout;
wait::timeout(self.body_mut().read(buf), timeout).map_err(|e| match e {
wait::Waited::TimedOut(e) => crate::error::decode(e).into_io(),
wait::Waited::Executor(e) => crate::error::decode(e).into_io(),
wait::Waited::Inner(e) => e,
})
}

View File

@@ -1,29 +1,27 @@
use std::future::Future;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::thread::{self, Thread};
use std::time::Duration;
use tokio::clock;
use tokio_executor::{
enter,
park::{Park, ParkThread, Unpark, UnparkThread},
};
use tokio::time::Instant;
pub(crate) fn timeout<F, I, E>(fut: F, timeout: Option<Duration>) -> Result<I, Waited<E>>
where
F: Future<Output = Result<I, E>>,
{
let _entered =
enter().map_err(|_| Waited::Executor(crate::error::BlockingClientInAsyncContext))?;
enter();
let deadline = timeout.map(|d| {
log::trace!("wait at most {:?}", d);
clock::now() + d
Instant::now() + d
});
let mut park = ParkThread::new();
// Arc shouldn't be necessary, since UnparkThread is reference counted internally,
let thread = ThreadWaker(thread::current());
// Arc shouldn't be necessary, since `Thread` is reference counted internally,
// but let's just stay safe for now.
let waker = futures_util::task::waker(Arc::new(UnparkWaker(park.unpark())));
let waker = futures_util::task::waker(Arc::new(thread));
let mut cx = Context::from_waker(&waker);
futures_util::pin_mut!(fut);
@@ -36,17 +34,16 @@ where
}
if let Some(deadline) = deadline {
let now = clock::now();
let now = Instant::now();
if now >= deadline {
log::trace!("wait timeout exceeded");
return Err(Waited::TimedOut(crate::error::TimedOut));
}
log::trace!("park timeout {:?}", deadline - now);
park.park_timeout(deadline - now)
.expect("ParkThread doesn't error");
thread::park_timeout(deadline - now);
} else {
park.park().expect("ParkThread doesn't error");
thread::park();
}
}
}
@@ -54,14 +51,24 @@ where
#[derive(Debug)]
pub(crate) enum Waited<E> {
TimedOut(crate::error::TimedOut),
Executor(crate::error::BlockingClientInAsyncContext),
Inner(E),
}
struct UnparkWaker(UnparkThread);
struct ThreadWaker(Thread);
impl futures_util::task::ArcWake for UnparkWaker {
impl futures_util::task::ArcWake for ThreadWaker {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.0.unpark();
}
}
fn enter() {
// Check we aren't already in a runtime
#[cfg(debug_assertions)]
{
tokio::runtime::Builder::new()
.build()
.expect("build shell runtime")
.enter(|| {});
}
}