refactor(lib): update to 2018 edition

This commit is contained in:
Sean McArthur
2019-07-09 14:50:51 -07:00
parent 79ae89e066
commit da9b0319ef
37 changed files with 358 additions and 398 deletions

View File

@@ -13,7 +13,7 @@ matrix:
- rust: stable
env: FEATURES="--no-default-features"
# Minimum Supported Rust Version
- rust: 1.27.0
- rust: 1.31.0
env: FEATURES="--no-default-features --features runtime" BUILD_ONLY="1"
before_script:

View File

@@ -10,6 +10,7 @@ license = "MIT"
authors = ["Sean McArthur <sean@seanmonstar.com>"]
keywords = ["http", "hyper", "hyperium"]
categories = ["network-programming", "web-programming::http-client", "web-programming::http-server"]
edition = "2018"
publish = false

View File

@@ -236,7 +236,7 @@ fn spawn_hello(rt: &mut Runtime, opts: &Opts) -> SocketAddr {
let body = opts.response_body;
let srv = Server::bind(&addr)
.http2_only(opts.http2);
.http2_only(opts.http2)
.http2_initial_stream_window_size_(opts.http2_stream_window)
.http2_initial_connection_window_size_(opts.http2_conn_window)
.serve(move || {

View File

@@ -4,9 +4,6 @@ use rustc_version::{version, Version};
fn main() {
let version = version().unwrap();
if version >= Version::parse("1.30.0").unwrap() {
println!("cargo:rustc-cfg=error_source");
}
if version >= Version::parse("1.34.0").unwrap() {
println!("cargo:rustc-cfg=try_from");
}

View File

@@ -9,12 +9,12 @@ use tokio_buf::SizeHint;
use h2;
use http::HeaderMap;
use common::Never;
use crate::common::Never;
use super::internal::{FullDataArg, FullDataRet};
use super::{Chunk, Payload};
use upgrade::OnUpgrade;
use crate::upgrade::OnUpgrade;
type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
type BodySender = mpsc::Sender<Result<Chunk, crate::Error>>;
/// A stream of `Chunk`s, used when receiving bodies.
///
@@ -34,7 +34,7 @@ enum Kind {
Chan {
content_length: Option<u64>,
abort_rx: oneshot::Receiver<()>,
rx: mpsc::Receiver<Result<Chunk, ::Error>>,
rx: mpsc::Receiver<Result<Chunk, crate::Error>>,
},
H2 {
content_length: Option<u64>,
@@ -200,7 +200,7 @@ impl Body {
}))
}
fn poll_eof(&mut self) -> Poll<Option<Chunk>, ::Error> {
fn poll_eof(&mut self) -> Poll<Option<Chunk>, crate::Error> {
match self.take_delayed_eof() {
Some(DelayEof::NotEof(mut delay)) => {
match self.poll_inner() {
@@ -238,7 +238,7 @@ impl Body {
}
}
fn poll_inner(&mut self) -> Poll<Option<Chunk>, ::Error> {
fn poll_inner(&mut self) -> Poll<Option<Chunk>, crate::Error> {
match self.kind {
Kind::Once(ref mut val) => Ok(Async::Ready(val.take())),
Kind::Chan {
@@ -247,7 +247,7 @@ impl Body {
ref mut abort_rx,
} => {
if let Ok(Async::Ready(())) = abort_rx.poll() {
return Err(::Error::new_body_write("body write aborted"));
return Err(crate::Error::new_body_write("body write aborted"));
}
match rx.poll().expect("mpsc cannot error") {
@@ -267,16 +267,16 @@ impl Body {
recv: ref mut h2, ..
} => h2
.poll()
.map(|async| {
async.map(|opt| {
.map(|r#async| {
r#async.map(|opt| {
opt.map(|bytes| {
let _ = h2.release_capacity().release_capacity(bytes.len());
Chunk::from(bytes)
})
})
})
.map_err(::Error::new_body),
Kind::Wrapped(ref mut s) => s.poll().map_err(::Error::new_body),
.map_err(crate::Error::new_body),
Kind::Wrapped(ref mut s) => s.poll().map_err(crate::Error::new_body),
}
}
}
@@ -291,7 +291,7 @@ impl Default for Body {
impl Payload for Body {
type Data = Chunk;
type Error = ::Error;
type Error = crate::Error;
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
self.poll_eof()
@@ -301,7 +301,7 @@ impl Payload for Body {
match self.kind {
Kind::H2 {
recv: ref mut h2, ..
} => h2.poll_trailers().map_err(::Error::new_h2),
} => h2.poll_trailers().map_err(crate::Error::new_h2),
_ => Ok(Async::Ready(None)),
}
}
@@ -336,7 +336,7 @@ impl Payload for Body {
impl ::http_body::Body for Body {
type Data = Chunk;
type Error = ::Error;
type Error = crate::Error;
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
<Self as Payload>::poll_data(self)
@@ -366,7 +366,7 @@ impl ::http_body::Body for Body {
impl Stream for Body {
type Item = Chunk;
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.poll_data()
@@ -395,13 +395,13 @@ impl fmt::Debug for Body {
impl Sender {
/// Check to see if this `Sender` can send more data.
pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
pub fn poll_ready(&mut self) -> Poll<(), crate::Error> {
match self.abort_tx.poll_cancel() {
Ok(Async::Ready(())) | Err(_) => return Err(::Error::new_closed()),
Ok(Async::Ready(())) | Err(_) => return Err(crate::Error::new_closed()),
Ok(Async::NotReady) => (),
}
self.tx.poll_ready().map_err(|_| ::Error::new_closed())
self.tx.poll_ready().map_err(|_| crate::Error::new_closed())
}
/// Sends data on this channel.
@@ -422,14 +422,14 @@ impl Sender {
let _ = self.abort_tx.send(());
}
pub(crate) fn send_error(&mut self, err: ::Error) {
pub(crate) fn send_error(&mut self, err: crate::Error) {
let _ = self.tx.try_send(Err(err));
}
}
impl Sink for Sender {
type SinkItem = Chunk;
type SinkError = ::Error;
type SinkError = crate::Error;
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
Ok(Async::Ready(()))
@@ -438,7 +438,7 @@ impl Sink for Sender {
fn start_send(&mut self, msg: Chunk) -> StartSend<Self::SinkItem, Self::SinkError> {
match self.poll_ready()? {
Async::Ready(_) => {
self.send_data(msg).map_err(|_| ::Error::new_closed())?;
self.send_data(msg).map_err(|_| crate::Error::new_closed())?;
Ok(AsyncSink::Ready)
}
Async::NotReady => Ok(AsyncSink::NotReady(msg)),

View File

@@ -176,7 +176,7 @@ mod tests {
let mut dst = Vec::with_capacity(128);
b.iter(|| {
let chunk = ::Chunk::from(s);
let chunk = crate::Chunk::from(s);
dst.put(chunk);
::test::black_box(&dst);
dst.clear();

View File

@@ -65,7 +65,7 @@ pub trait Payload: Send + 'static {
// The only thing a user *could* do is reference the method, but DON'T
// DO THAT! :)
#[doc(hidden)]
fn __hyper_full_data(&mut self, FullDataArg) -> FullDataRet<Self::Data> {
fn __hyper_full_data(&mut self, _: FullDataArg) -> FullDataRet<Self::Data> {
FullDataRet(None)
}
}

View File

@@ -18,12 +18,12 @@ use futures::future::{self, Either, Executor};
use h2;
use tokio_io::{AsyncRead, AsyncWrite};
use body::Payload;
use common::Exec;
use upgrade::Upgraded;
use proto;
use crate::body::Payload;
use crate::common::Exec;
use crate::upgrade::Upgraded;
use crate::proto;
use super::dispatch;
use {Body, Request, Response};
use crate::{Body, Request, Response};
type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
@@ -39,7 +39,7 @@ type ConnEither<T, B> = Either<
/// Returns a `Handshake` future over some IO.
///
/// This is a shortcut for `Builder::new().handshake(io)`.
pub fn handshake<T>(io: T) -> Handshake<T, ::Body>
pub fn handshake<T>(io: T) -> Handshake<T, crate::Body>
where
T: AsyncRead + AsyncWrite + Send + 'static,
{
@@ -98,7 +98,7 @@ pub struct Handshake<T, B> {
pub struct ResponseFuture {
// for now, a Box is used to hide away the internal `B`
// that can be returned if canceled
inner: Box<dyn Future<Item=Response<Body>, Error=::Error> + Send>,
inner: Box<dyn Future<Item=Response<Body>, Error=crate::Error> + Send>,
}
/// Deconstructed parts of a `Connection`.
@@ -145,7 +145,7 @@ impl<B> SendRequest<B>
/// Polls to determine whether this sender can be used yet for a request.
///
/// If the associated connection is closed, this returns an Error.
pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
pub fn poll_ready(&mut self) -> Poll<(), crate::Error> {
self.dispatch.poll_ready()
}
@@ -235,7 +235,7 @@ where
},
Err(_req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled().with("connection was not ready");
let err = crate::Error::new_canceled().with("connection was not ready");
Either::B(future::err(err))
}
};
@@ -245,7 +245,7 @@ where
}
}
pub(crate) fn send_request_retryable(&mut self, req: Request<B>) -> impl Future<Item = Response<Body>, Error = (::Error, Option<Request<B>>)>
pub(crate) fn send_request_retryable(&mut self, req: Request<B>) -> impl Future<Item = Response<Body>, Error = (crate::Error, Option<Request<B>>)>
where
B: Send,
{
@@ -262,7 +262,7 @@ where
},
Err(req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled().with("connection was not ready");
let err = crate::Error::new_canceled().with("connection was not ready");
Either::B(future::err((err, Some(req))))
}
}
@@ -305,7 +305,7 @@ impl<B> Http2SendRequest<B>
where
B: Payload + 'static,
{
pub(super) fn send_request_retryable(&mut self, req: Request<B>) -> impl Future<Item=Response<Body>, Error=(::Error, Option<Request<B>>)>
pub(super) fn send_request_retryable(&mut self, req: Request<B>) -> impl Future<Item=Response<Body>, Error=(crate::Error, Option<Request<B>>)>
where
B: Send,
{
@@ -322,7 +322,7 @@ where
},
Err(req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled().with("connection was not ready");
let err = crate::Error::new_canceled().with("connection was not ready");
Either::B(future::err((err, Some(req))))
}
}
@@ -380,7 +380,7 @@ where
/// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
/// to work with this function; or use the `without_shutdown` wrapper.
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
pub fn poll_without_shutdown(&mut self) -> Poll<(), crate::Error> {
match self.inner.as_mut().expect("already upgraded") {
&mut Either::A(ref mut h1) => {
h1.poll_without_shutdown()
@@ -393,9 +393,9 @@ where
/// Prevent shutdown of the underlying IO object at the end of service the request,
/// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
pub fn without_shutdown(self) -> impl Future<Item=Parts<T>, Error=::Error> {
pub fn without_shutdown(self) -> impl Future<Item=Parts<T>, Error=crate::Error> {
let mut conn = Some(self);
::futures::future::poll_fn(move || -> ::Result<_> {
::futures::future::poll_fn(move || -> crate::Result<_> {
try_ready!(conn.as_mut().unwrap().poll_without_shutdown());
Ok(conn.take().unwrap().into_parts().into())
})
@@ -408,7 +408,7 @@ where
B: Payload + 'static,
{
type Item = ();
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match try_ready!(self.inner.poll()) {
@@ -552,7 +552,7 @@ where
B: Payload + 'static,
{
type Item = (SendRequest<B>, Connection<T, B>);
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let io = self.io.take().expect("polled more than once");
@@ -601,7 +601,7 @@ impl<T, B> fmt::Debug for Handshake<T, B> {
impl Future for ResponseFuture {
type Item = Response<Body>;
type Error = ::Error;
type Error = crate::Error;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@@ -620,7 +620,7 @@ impl fmt::Debug for ResponseFuture {
impl<B> Future for WhenReady<B> {
type Item = SendRequest<B>;
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut tx = self.tx.take().expect("polled after complete");

View File

@@ -11,7 +11,7 @@ use std::{fmt, mem};
use bytes::{BufMut, Bytes, BytesMut};
use futures::Future;
use http::{uri, Response, Uri};
use ::http::{uri, Response, Uri};
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature = "runtime")] pub mod dns;
@@ -68,9 +68,9 @@ impl Destination {
///
/// Returns an error if the uri contains no authority or
/// no scheme.
pub fn try_from_uri(uri: Uri) -> ::Result<Self> {
uri.authority_part().ok_or(::error::Parse::Uri)?;
uri.scheme_part().ok_or(::error::Parse::Uri)?;
pub fn try_from_uri(uri: Uri) -> crate::Result<Self> {
uri.authority_part().ok_or(crate::error::Parse::Uri)?;
uri.scheme_part().ok_or(crate::error::Parse::Uri)?;
Ok(Destination { uri })
}
@@ -116,8 +116,8 @@ impl Destination {
/// # Error
///
/// Returns an error if the string is not a valid scheme.
pub fn set_scheme(&mut self, scheme: &str) -> ::Result<()> {
let scheme = scheme.parse().map_err(::error::Parse::from)?;
pub fn set_scheme(&mut self, scheme: &str) -> crate::Result<()> {
let scheme = scheme.parse().map_err(crate::error::Parse::from)?;
self.update_uri(move |parts| {
parts.scheme = Some(scheme);
})
@@ -143,19 +143,19 @@ impl Destination {
/// # Error
///
/// Returns an error if the string is not a valid hostname.
pub fn set_host(&mut self, host: &str) -> ::Result<()> {
pub fn set_host(&mut self, host: &str) -> crate::Result<()> {
// Prevent any userinfo setting, it's bad!
if host.contains('@') {
return Err(::error::Parse::Uri.into());
return Err(crate::error::Parse::Uri.into());
}
let auth = if let Some(port) = self.port() {
let bytes = Bytes::from(format!("{}:{}", host, port));
uri::Authority::from_shared(bytes)
.map_err(::error::Parse::from)?
.map_err(crate::error::Parse::from)?
} else {
let auth = host.parse::<uri::Authority>().map_err(::error::Parse::from)?;
let auth = host.parse::<uri::Authority>().map_err(crate::error::Parse::from)?;
if auth.port_part().is_some() { // std::uri::Authority::Uri
return Err(::error::Parse::Uri.into());
return Err(crate::error::Parse::Uri.into());
}
auth
};
@@ -218,7 +218,7 @@ impl Destination {
.expect("valid uri should be valid with port");
}
fn update_uri<F>(&mut self, f: F) -> ::Result<()>
fn update_uri<F>(&mut self, f: F) -> crate::Result<()>
where
F: FnOnce(&mut uri::Parts)
{
@@ -236,7 +236,7 @@ impl Destination {
},
Err(err) => {
self.uri = old_uri;
Err(::error::Parse::from(err).into())
Err(crate::error::Parse::from(err).into())
},
}
}
@@ -254,7 +254,7 @@ impl Destination {
#[cfg(try_from)]
impl TryFrom<Uri> for Destination {
type Error = ::error::Error;
type Error = crate::error::Error;
fn try_from(uri: Uri) -> Result<Self, Self::Error> {
Destination::try_from_uri(uri)
@@ -325,7 +325,7 @@ impl Connected {
// ===== impl Extra =====
impl Extra {
pub(super) fn set(&self, res: &mut Response<::Body>) {
pub(super) fn set(&self, res: &mut Response<crate::Body>) {
self.0.set(res);
}
}
@@ -345,7 +345,7 @@ impl fmt::Debug for Extra {
trait ExtraInner: Send + Sync {
fn clone_box(&self) -> Box<dyn ExtraInner>;
fn set(&self, res: &mut Response<::Body>);
fn set(&self, res: &mut Response<crate::Body>);
}
// This indirection allows the `Connected` to have a type-erased "extra" value,
@@ -362,7 +362,7 @@ where
Box::new(self.clone())
}
fn set(&self, res: &mut Response<::Body>) {
fn set(&self, res: &mut Response<crate::Body>) {
res.extensions_mut().insert(self.0.clone());
}
}
@@ -383,7 +383,7 @@ where
Box::new(self.clone())
}
fn set(&self, res: &mut Response<::Body>) {
fn set(&self, res: &mut Response<crate::Body>) {
self.0.set(res);
res.extensions_mut().insert(self.1.clone());
}
@@ -567,7 +567,7 @@ mod tests {
let c1 = Connected::new()
.extra(Ex1(41));
let mut res1 = ::Response::new(::Body::empty());
let mut res1 = crate::Response::new(crate::Body::empty());
assert_eq!(res1.extensions().get::<Ex1>(), None);
@@ -590,7 +590,7 @@ mod tests {
.extra(Ex2("zoom"))
.extra(Ex3("pew pew"));
let mut res1 = ::Response::new(::Body::empty());
let mut res1 = crate::Response::new(crate::Body::empty());
assert_eq!(res1.extensions().get::<Ex1>(), None);
assert_eq!(res1.extensions().get::<Ex2>(), None);
@@ -612,7 +612,7 @@ mod tests {
.extra(Ex2("hiccup"))
.extra(Ex1(99));
let mut res2 = ::Response::new(::Body::empty());
let mut res2 = crate::Response::new(crate::Body::empty());
c2
.extra

View File

@@ -2,10 +2,10 @@ use futures::{future, Async, Future, Poll, Stream};
use futures::sync::{mpsc, oneshot};
use want;
use common::Never;
use crate::common::Never;
pub type RetryPromise<T, U> = oneshot::Receiver<Result<U, (::Error, Option<T>)>>;
pub type Promise<T> = oneshot::Receiver<Result<T, ::Error>>;
pub type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
pub type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;
pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (tx, rx) = mpsc::unbounded();
@@ -51,9 +51,9 @@ pub struct UnboundedSender<T, U> {
}
impl<T, U> Sender<T, U> {
pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
pub fn poll_ready(&mut self) -> Poll<(), crate::Error> {
self.giver.poll_want()
.map_err(|_| ::Error::new_closed())
.map_err(|_| crate::Error::new_closed())
}
pub fn is_ready(&self) -> bool {
@@ -167,14 +167,14 @@ struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
impl<T, U> Drop for Envelope<T, U> {
fn drop(&mut self) {
if let Some((val, cb)) = self.0.take() {
let _ = cb.send(Err((::Error::new_canceled().with("connection closed"), Some(val))));
let _ = cb.send(Err((crate::Error::new_canceled().with("connection closed"), Some(val))));
}
}
}
pub enum Callback<T, U> {
Retry(oneshot::Sender<Result<U, (::Error, Option<T>)>>),
NoRetry(oneshot::Sender<Result<U, ::Error>>),
Retry(oneshot::Sender<Result<U, (crate::Error, Option<T>)>>),
NoRetry(oneshot::Sender<Result<U, crate::Error>>),
}
impl<T, U> Callback<T, U> {
@@ -192,7 +192,7 @@ impl<T, U> Callback<T, U> {
}
}
pub(crate) fn send(self, val: Result<U, (::Error, Option<T>)>) {
pub(crate) fn send(self, val: Result<U, (crate::Error, Option<T>)>) {
match self {
Callback::Retry(tx) => {
let _ = tx.send(val);
@@ -205,7 +205,7 @@ impl<T, U> Callback<T, U> {
pub(crate) fn send_when(
self,
mut when: impl Future<Item=U, Error=(::Error, Option<T>)>,
mut when: impl Future<Item=U, Error=(crate::Error, Option<T>)>,
) -> impl Future<Item=(), Error=()> {
let mut cb = Some(self);
@@ -266,7 +266,7 @@ mod tests {
.expect_err("promise should error");
match (err.0.kind(), err.1) {
(&::error::Kind::Canceled, Some(_)) => (),
(&crate::error::Kind::Canceled, Some(_)) => (),
e => panic!("expected Error::Cancel(_), found {:?}", e),
}
@@ -312,7 +312,7 @@ mod tests {
#[cfg(feature = "nightly")]
#[bench]
fn giver_queue_throughput(b: &mut test::Bencher) {
use {Body, Request, Response};
use crate::{Body, Request, Response};
let (mut tx, mut rx) = super::channel::<Request<Body>, Response<Body>>();
b.iter(move || {

View File

@@ -89,8 +89,8 @@ use http::{Method, Request, Response, Uri, Version};
use http::header::{HeaderValue, HOST};
use http::uri::Scheme;
use body::{Body, Payload};
use common::{lazy as hyper_lazy, Lazy};
use crate::body::{Body, Payload};
use crate::common::{lazy as hyper_lazy, Lazy};
use self::connect::{Alpn, Connect, Connected, Destination};
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};
@@ -237,7 +237,7 @@ where C: Connect + Sync + 'static,
Version::HTTP_11 => (),
Version::HTTP_10 => if is_http_connect {
warn!("CONNECT is not allowed for HTTP/1.0");
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_request_method())));
return ResponseFuture::new(Box::new(future::err(crate::Error::new_user_unsupported_request_method())));
},
other_h2 @ Version::HTTP_2 => if self.config.ver != Ver::Http2 {
return ResponseFuture::error_version(other_h2);
@@ -257,7 +257,7 @@ where C: Connect + Sync + 'static,
ResponseFuture::new(Box::new(self.retryably_send_request(req, pool_key)))
}
fn retryably_send_request(&self, req: Request<B>, pool_key: PoolKey) -> impl Future<Item=Response<Body>, Error=::Error> {
fn retryably_send_request(&self, req: Request<B>, pool_key: PoolKey) -> impl Future<Item=Response<Body>, Error=crate::Error> {
let client = self.clone();
let uri = req.uri().clone();
@@ -320,7 +320,7 @@ where C: Connect + Sync + 'static,
};
} else if req.method() == &Method::CONNECT {
debug!("client does not support CONNECT requests over HTTP2");
return Either::A(future::err(ClientError::Normal(::Error::new_user_unsupported_request_method())));
return Either::A(future::err(ClientError::Normal(crate::Error::new_user_unsupported_request_method())));
}
let fut = pooled.send_request_retryable(req)
@@ -478,7 +478,7 @@ where C: Connect + Sync + 'static,
}
fn connect_to(&self, uri: Uri, pool_key: PoolKey)
-> impl Lazy<Item=Pooled<PoolClient<B>>, Error=::Error>
-> impl Lazy<Item=Pooled<PoolClient<B>>, Error=crate::Error>
{
let executor = self.conn_builder.exec.clone();
let pool = self.pool.clone();
@@ -498,12 +498,12 @@ where C: Connect + Sync + 'static,
let connecting = match pool.connecting(&pool_key, ver) {
Some(lock) => lock,
None => {
let canceled = ::Error::new_canceled().with("HTTP/2 connection in progress");
let canceled = crate::Error::new_canceled().with("HTTP/2 connection in progress");
return Either::B(future::err(canceled));
}
};
Either::A(connector.connect(dst)
.map_err(::Error::new_connect)
.map_err(crate::Error::new_connect)
.and_then(move |(io, connected)| {
// If ALPN is h2 and we aren't http2_only already,
// then we need to convert our pool checkout into
@@ -517,7 +517,7 @@ where C: Connect + Sync + 'static,
None => {
// Another connection has already upgraded,
// the pool checkout should finish up for us.
let canceled = ::Error::new_canceled().with("ALPN upgraded to HTTP/2");
let canceled = crate::Error::new_canceled().with("ALPN upgraded to HTTP/2");
return Either::B(future::err(canceled));
}
}
@@ -583,11 +583,11 @@ impl<C, B> fmt::Debug for Client<C, B> {
/// This is returned by `Client::request` (and `Client::get`).
#[must_use = "futures do nothing unless polled"]
pub struct ResponseFuture {
inner: Box<dyn Future<Item=Response<Body>, Error=::Error> + Send>,
inner: Box<dyn Future<Item=Response<Body>, Error=crate::Error> + Send>,
}
impl ResponseFuture {
fn new(fut: Box<dyn Future<Item=Response<Body>, Error=::Error> + Send>) -> Self {
fn new(fut: Box<dyn Future<Item=Response<Body>, Error=crate::Error> + Send>) -> Self {
Self {
inner: fut,
}
@@ -595,7 +595,7 @@ impl ResponseFuture {
fn error_version(ver: Version) -> Self {
warn!("Request has unsupported version \"{:?}\"", ver);
ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_version())))
ResponseFuture::new(Box::new(future::err(crate::Error::new_user_unsupported_version())))
}
}
@@ -607,7 +607,7 @@ impl fmt::Debug for ResponseFuture {
impl Future for ResponseFuture {
type Item = Response<Body>;
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
@@ -627,7 +627,7 @@ enum PoolTx<B> {
}
impl<B> PoolClient<B> {
fn poll_ready(&mut self) -> Poll<(), ::Error> {
fn poll_ready(&mut self) -> Poll<(), crate::Error> {
match self.tx {
PoolTx::Http1(ref mut tx) => tx.poll_ready(),
PoolTx::Http2(_) => Ok(Async::Ready(())),
@@ -661,7 +661,7 @@ impl<B> PoolClient<B> {
}
impl<B: Payload + 'static> PoolClient<B> {
fn send_request_retryable(&mut self, req: Request<B>) -> impl Future<Item = Response<Body>, Error = (::Error, Option<Request<B>>)>
fn send_request_retryable(&mut self, req: Request<B>) -> impl Future<Item = Response<Body>, Error = (crate::Error, Option<Request<B>>)>
where
B: Send,
{
@@ -713,17 +713,17 @@ where
// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
enum ClientError<B> {
Normal(::Error),
Normal(crate::Error),
Canceled {
connection_reused: bool,
req: Request<B>,
reason: ::Error,
reason: crate::Error,
}
}
impl<B> ClientError<B> {
fn map_with_reused(conn_reused: bool)
-> impl Fn((::Error, Option<Request<B>>)) -> Self
-> impl Fn((crate::Error, Option<Request<B>>)) -> Self
{
move |(err, orig_req)| {
if let Some(req) = orig_req {
@@ -797,7 +797,7 @@ fn authority_form(uri: &mut Uri) {
};
}
fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> ::Result<String> {
fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> crate::Result<String> {
let uri_clone = uri.clone();
match (uri_clone.scheme_part(), uri_clone.authority_part()) {
(Some(scheme), Some(auth)) => {
@@ -819,7 +819,7 @@ fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> ::Result<String> {
},
_ => {
debug!("Client requires absolute-form URIs, received: {:?}", uri);
Err(::Error::new_user_absolute_uri_required())
Err(crate::Error::new_user_absolute_uri_required())
}
}
}

View File

@@ -9,7 +9,7 @@ use futures::sync::oneshot;
#[cfg(feature = "runtime")]
use tokio_timer::Interval;
use common::Exec;
use crate::common::Exec;
use super::Ver;
// FIXME: allow() required due to `impl Trait` leaking types to this lint
@@ -75,7 +75,7 @@ struct PoolInner<T> {
// A oneshot channel is used to allow the interval to be notified when
// the Pool completely drops. That way, the interval can cancel immediately.
#[cfg(feature = "runtime")]
idle_interval_ref: Option<oneshot::Sender<::common::Never>>,
idle_interval_ref: Option<oneshot::Sender<crate::common::Never>>,
#[cfg(feature = "runtime")]
exec: Exec,
timeout: Option<Duration>,
@@ -569,7 +569,7 @@ pub(super) struct Checkout<T> {
}
impl<T: Poolable> Checkout<T> {
fn poll_waiter(&mut self) -> Poll<Option<Pooled<T>>, ::Error> {
fn poll_waiter(&mut self) -> Poll<Option<Pooled<T>>, crate::Error> {
static CANCELED: &str = "pool checkout failed";
if let Some(mut rx) = self.waiter.take() {
match rx.poll() {
@@ -577,14 +577,14 @@ impl<T: Poolable> Checkout<T> {
if value.is_open() {
Ok(Async::Ready(Some(self.pool.reuse(&self.key, value))))
} else {
Err(::Error::new_canceled().with(CANCELED))
Err(crate::Error::new_canceled().with(CANCELED))
}
},
Ok(Async::NotReady) => {
self.waiter = Some(rx);
Ok(Async::NotReady)
},
Err(_canceled) => Err(::Error::new_canceled().with(CANCELED)),
Err(_canceled) => Err(crate::Error::new_canceled().with(CANCELED)),
}
} else {
Ok(Async::Ready(None))
@@ -644,7 +644,7 @@ impl<T: Poolable> Checkout<T> {
impl<T: Poolable> Future for Checkout<T> {
type Item = Pooled<T>;
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(pooled) = try_ready!(self.poll_waiter()) {
@@ -654,7 +654,7 @@ impl<T: Poolable> Future for Checkout<T> {
if let Some(pooled) = self.checkout() {
Ok(Async::Ready(pooled))
} else if !self.pool.is_enabled() {
Err(::Error::new_canceled().with("pool is disabled"))
Err(crate::Error::new_canceled().with("pool is disabled"))
} else {
Ok(Async::NotReady)
}
@@ -723,7 +723,7 @@ struct IdleInterval<T> {
// This allows the IdleInterval to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on,
// but Err(Canceled) will be received when the Pool is dropped.
pool_drop_notifier: oneshot::Receiver<::common::Never>,
pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
}
#[cfg(feature = "runtime")]
@@ -783,7 +783,7 @@ mod tests {
use std::time::Duration;
use futures::{Async, Future};
use futures::future;
use common::Exec;
use crate::common::Exec;
use super::{Connecting, Key, Poolable, Pool, Reservation, WeakOpt};
/// Test unique reservations.

View File

@@ -6,7 +6,7 @@ use futures::future::poll_fn;
use futures::sync::oneshot;
use tokio::runtime::current_thread::Runtime;
use mock::MockConnector;
use crate::mock::MockConnector;
use super::*;
#[test]
@@ -20,7 +20,7 @@ fn retryable_request() {
let sock2 = connector.mock("http://mock.local");
let client = Client::builder()
.build::<_, ::Body>(connector);
.build::<_, crate::Body>(connector);
client.pool.no_timer();
@@ -67,7 +67,7 @@ fn conn_reset_after_write() {
let sock1 = connector.mock("http://mock.local");
let client = Client::builder()
.build::<_, ::Body>(connector);
.build::<_, crate::Body>(connector);
client.pool.no_timer();
@@ -119,11 +119,11 @@ fn checkout_win_allows_connect_future_to_be_pooled() {
let sock2 = connector.mock_fut("http://mock.local", rx);
let client = Client::builder()
.build::<_, ::Body>(connector);
.build::<_, crate::Body>(connector);
client.pool.no_timer();
let uri = "http://mock.local/a".parse::<::Uri>().expect("uri parse");
let uri = "http://mock.local/a".parse::<crate::Uri>().expect("uri parse");
// First request just sets us up to have a connection able to be put
// back in the pool. *However*, it doesn't insert immediately. The
@@ -214,7 +214,7 @@ fn bench_http1_get_0b(b: &mut test::Bencher) {
let client = Client::builder()
.build::<_, ::Body>(connector.clone());
.build::<_, crate::Body>(connector.clone());
client.pool.no_timer();
@@ -246,7 +246,7 @@ fn bench_http1_get_10b(b: &mut test::Bencher) {
let client = Client::builder()
.build::<_, ::Body>(connector.clone());
.build::<_, crate::Body>(connector.clone());
client.pool.no_timer();

View File

@@ -3,17 +3,17 @@ use std::sync::Arc;
use futures::future::{Executor, Future};
use body::Payload;
use proto::h2::server::H2Stream;
use server::conn::spawn_all::{NewSvcTask, Watcher};
use service::Service;
use crate::body::Payload;
use crate::proto::h2::server::H2Stream;
use crate::server::conn::spawn_all::{NewSvcTask, Watcher};
use crate::service::Service;
pub trait H2Exec<F, B: Payload>: Clone {
fn execute_h2stream(&self, fut: H2Stream<F, B>) -> ::Result<()>;
fn execute_h2stream(&self, fut: H2Stream<F, B>) -> crate::Result<()>;
}
pub trait NewSvcExec<I, N, S: Service, E, W: Watcher<I, S, E>>: Clone {
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) -> ::Result<()>;
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) -> crate::Result<()>;
}
// Either the user provides an executor for background tasks, or we use
@@ -27,7 +27,7 @@ pub enum Exec {
// ===== impl Exec =====
impl Exec {
pub(crate) fn execute<F>(&self, fut: F) -> ::Result<()>
pub(crate) fn execute<F>(&self, fut: F) -> crate::Result<()>
where
F: Future<Item=(), Error=()> + Send + 'static,
{
@@ -62,7 +62,7 @@ impl Exec {
.spawn(Box::new(fut))
.map_err(|err| {
warn!("executor error: {:?}", err);
::Error::new_execute(TokioSpawnError)
crate::Error::new_execute(TokioSpawnError)
})
}
#[cfg(not(feature = "runtime"))]
@@ -75,7 +75,7 @@ impl Exec {
e.execute(Box::new(fut))
.map_err(|err| {
warn!("executor error: {:?}", err.kind());
::Error::new_execute("custom executor failed")
crate::Error::new_execute("custom executor failed")
})
},
}
@@ -95,7 +95,7 @@ where
H2Stream<F, B>: Future<Item=(), Error=()> + Send + 'static,
B: Payload,
{
fn execute_h2stream(&self, fut: H2Stream<F, B>) -> ::Result<()> {
fn execute_h2stream(&self, fut: H2Stream<F, B>) -> crate::Result<()> {
self.execute(fut)
}
}
@@ -106,7 +106,7 @@ where
S: Service,
W: Watcher<I, S, E>,
{
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) -> ::Result<()> {
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) -> crate::Result<()> {
self.execute(fut)
}
}
@@ -119,11 +119,11 @@ where
H2Stream<F, B>: Future<Item=(), Error=()>,
B: Payload,
{
fn execute_h2stream(&self, fut: H2Stream<F, B>) -> ::Result<()> {
fn execute_h2stream(&self, fut: H2Stream<F, B>) -> crate::Result<()> {
self.execute(fut)
.map_err(|err| {
warn!("executor error: {:?}", err.kind());
::Error::new_execute("custom executor failed")
crate::Error::new_execute("custom executor failed")
})
}
}
@@ -135,11 +135,11 @@ where
S: Service,
W: Watcher<I, S, E>,
{
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) -> ::Result<()> {
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) -> crate::Result<()> {
self.execute(fut)
.map_err(|err| {
warn!("executor error: {:?}", err.kind());
::Error::new_execute("custom executor failed")
crate::Error::new_execute("custom executor failed")
})
}
}

View File

@@ -133,14 +133,8 @@ impl Error {
self.inner.kind == Kind::IncompleteMessage
}
#[doc(hidden)]
#[cfg_attr(error_source, deprecated(note = "use Error::source instead"))]
pub fn cause2(&self) -> Option<&(dyn StdError + 'static + Sync + Send)> {
self.inner.cause.as_ref().map(|e| &**e)
}
/// Consumes the error, returning its cause.
pub fn into_cause(self) -> Option<Box<dyn StdError + Sync + Send>> {
pub fn into_cause(self) -> Option<Box<dyn StdError + Send + Sync>> {
self.inner.cause
}
@@ -162,28 +156,6 @@ impl Error {
&self.inner.kind
}
#[cfg(not(error_source))]
pub(crate) fn h2_reason(&self) -> h2::Reason {
// Since we don't have access to `Error::source`, we can only
// look so far...
let mut cause = self.cause2();
while let Some(err) = cause {
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
return h2_err
.reason()
.unwrap_or(h2::Reason::INTERNAL_ERROR);
}
cause = err
.downcast_ref::<Error>()
.and_then(Error::cause2);
}
// else
h2::Reason::INTERNAL_ERROR
}
#[cfg(error_source)]
pub(crate) fn h2_reason(&self) -> h2::Reason {
// Find an h2::Reason somewhere in the cause stack, if it exists,
// otherwise assume an INTERNAL_ERROR.
@@ -370,16 +342,6 @@ impl StdError for Error {
}
}
#[cfg(not(error_source))]
fn cause(&self) -> Option<&StdError> {
self
.inner
.cause
.as_ref()
.map(|cause| &**cause as &StdError)
}
#[cfg(error_source)]
fn source(&self) -> Option<&(dyn StdError + 'static)> {
self
.inner

View File

@@ -53,10 +53,10 @@ pub use http::{
Version,
};
pub use client::Client;
pub use error::{Result, Error};
pub use body::{Body, Chunk};
pub use server::Server;
pub use crate::client::Client;
pub use crate::error::{Result, Error};
pub use crate::body::{Body, Chunk};
pub use crate::server::Server;
#[macro_use]
mod common;

View File

@@ -13,7 +13,7 @@ use futures::task::{self, Task};
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature = "runtime")]
use ::client::connect::{Connect, Connected, Destination};
use crate::client::connect::{Connect, Connected, Destination};
#[derive(Debug)]
pub struct MockCursor {

View File

@@ -8,9 +8,9 @@ use http::{HeaderMap, Method, Version};
use http::header::{HeaderValue, CONNECTION};
use tokio_io::{AsyncRead, AsyncWrite};
use ::Chunk;
use proto::{BodyLength, DecodedLength, MessageHead};
use headers::connection_keep_alive;
use crate::Chunk;
use crate::proto::{BodyLength, DecodedLength, MessageHead};
use crate::headers::connection_keep_alive;
use super::io::{Buffered};
use super::{EncodedBuf, Encode, Encoder, /*Decode,*/ Decoder, Http1Transaction, ParseContext};
@@ -84,7 +84,7 @@ where I: AsyncRead + AsyncWrite,
self.io.into_inner()
}
pub fn pending_upgrade(&mut self) -> Option<::upgrade::Pending> {
pub fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
self.state.upgrade.take()
}
@@ -129,7 +129,7 @@ where I: AsyncRead + AsyncWrite,
read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
}
pub fn read_head(&mut self) -> Poll<Option<(MessageHead<T::Incoming>, DecodedLength, bool)>, ::Error> {
pub fn read_head(&mut self) -> Poll<Option<(MessageHead<T::Incoming>, DecodedLength, bool)>, crate::Error> {
debug_assert!(self.can_read_head());
trace!("Conn::read_head");
@@ -168,7 +168,7 @@ where I: AsyncRead + AsyncWrite,
Ok(Async::Ready(Some((msg.head, msg.decode, msg.wants_upgrade))))
}
fn on_read_head_error<Z>(&mut self, e: ::Error) -> Poll<Option<Z>, ::Error> {
fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<Z>, crate::Error> {
// If we are currently waiting on a message, then an empty
// message should be reported as an error. If not, it is just
// the connection closing gracefully.
@@ -233,7 +233,7 @@ where I: AsyncRead + AsyncWrite,
ret
}
pub fn read_keep_alive(&mut self) -> Poll<(), ::Error> {
pub fn read_keep_alive(&mut self) -> Poll<(), crate::Error> {
debug_assert!(!self.can_read_head() && !self.can_read_body());
if self.is_mid_message() {
@@ -254,22 +254,22 @@ where I: AsyncRead + AsyncWrite,
//
// This should only be called for Clients wanting to enter the idle
// state.
fn require_empty_read(&mut self) -> Poll<(), ::Error> {
fn require_empty_read(&mut self) -> Poll<(), crate::Error> {
debug_assert!(!self.can_read_head() && !self.can_read_body());
debug_assert!(!self.is_mid_message());
debug_assert!(T::is_client());
if !self.io.read_buf().is_empty() {
debug!("received an unexpected {} bytes", self.io.read_buf().len());
return Err(::Error::new_unexpected_message());
return Err(crate::Error::new_unexpected_message());
}
let num_read = try_ready!(self.force_io_read().map_err(::Error::new_io));
let num_read = try_ready!(self.force_io_read().map_err(crate::Error::new_io));
if num_read == 0 {
let ret = if self.should_error_on_eof() {
trace!("found unexpected EOF on busy connection: {:?}", self.state);
Err(::Error::new_incomplete())
Err(crate::Error::new_incomplete())
} else {
trace!("found EOF on idle connection, closing");
Ok(Async::Ready(()))
@@ -281,10 +281,10 @@ where I: AsyncRead + AsyncWrite,
}
debug!("received unexpected {} bytes on an idle connection", num_read);
Err(::Error::new_unexpected_message())
Err(crate::Error::new_unexpected_message())
}
fn mid_message_detect_eof(&mut self) -> Poll<(), ::Error> {
fn mid_message_detect_eof(&mut self) -> Poll<(), crate::Error> {
debug_assert!(!self.can_read_head() && !self.can_read_body());
debug_assert!(self.is_mid_message());
@@ -292,12 +292,12 @@ where I: AsyncRead + AsyncWrite,
return Ok(Async::NotReady);
}
let num_read = try_ready!(self.force_io_read().map_err(::Error::new_io));
let num_read = try_ready!(self.force_io_read().map_err(crate::Error::new_io));
if num_read == 0 {
trace!("found unexpected EOF on busy connection: {:?}", self.state);
self.state.close_read();
Err(::Error::new_incomplete())
Err(crate::Error::new_incomplete())
} else {
Ok(Async::Ready(()))
}
@@ -563,12 +563,12 @@ where I: AsyncRead + AsyncWrite,
//
// - Client: there is nothing we can do
// - Server: if Response hasn't been written yet, we can send a 4xx response
fn on_parse_error(&mut self, err: ::Error) -> ::Result<()> {
fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
match self.state.writing {
Writing::Init => {
if self.has_h2_prefix() {
return Err(::Error::new_version_h2())
return Err(crate::Error::new_version_h2())
}
if let Some(msg) = T::on_error(&err) {
// Drop the cached headers so as to not trigger a debug
@@ -623,7 +623,7 @@ where I: AsyncRead + AsyncWrite,
}
}
pub fn take_error(&mut self) -> ::Result<()> {
pub fn take_error(&mut self) -> crate::Result<()> {
if let Some(err) = self.state.error.take() {
Err(err)
} else {
@@ -631,7 +631,7 @@ where I: AsyncRead + AsyncWrite,
}
}
pub(super) fn on_upgrade(&mut self) -> ::upgrade::OnUpgrade {
pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
trace!("{}: prepare possible HTTP upgrade", T::LOG);
self.state.prepare_upgrade()
}
@@ -658,7 +658,7 @@ struct State {
cached_headers: Option<HeaderMap>,
/// If an error occurs when there wasn't a direct way to return it
/// back to the user, this is set.
error: Option<::Error>,
error: Option<crate::Error>,
/// Current keep-alive status.
keep_alive: KA,
/// If mid-message, the HTTP Method that started it.
@@ -675,7 +675,7 @@ struct State {
/// State of allowed writes
writing: Writing,
/// An expected pending HTTP upgrade.
upgrade: Option<::upgrade::Pending>,
upgrade: Option<crate::upgrade::Pending>,
/// Either HTTP/1.0 or 1.1 connection
version: Version,
}
@@ -868,9 +868,9 @@ impl State {
}
}
fn prepare_upgrade(&mut self) -> ::upgrade::OnUpgrade {
fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
debug_assert!(self.upgrade.is_none());
let (tx, rx) = ::upgrade::pending();
let (tx, rx) = crate::upgrade::pending();
self.upgrade = Some(tx);
rx
}
@@ -888,9 +888,9 @@ mod tests {
let len = s.len();
b.bytes = len as u64;
let mut io = ::mock::AsyncIo::new_buf(Vec::new(), 0);
let mut io = crate::mock::AsyncIo::new_buf(Vec::new(), 0);
io.panic();
let mut conn = Conn::<_, ::Chunk, ::proto::h1::ServerTransaction>::new(io);
let mut conn = Conn::<_, crate::Chunk, crate::proto::h1::ServerTransaction>::new(io);
*conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
conn.state.cached_headers = Some(HeaderMap::with_capacity(2));

View File

@@ -323,7 +323,7 @@ mod tests {
use super::super::io::MemRead;
use futures::{Async, Poll};
use bytes::{BytesMut, Bytes};
use mock::AsyncIo;
use crate::mock::AsyncIo;
impl<'a> MemRead for &'a [u8] {
fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {

View File

@@ -5,17 +5,17 @@ use futures::{Async, Future, Poll, Stream};
use http::{Request, Response, StatusCode};
use tokio_io::{AsyncRead, AsyncWrite};
use body::{Body, Payload};
use body::internal::FullDataArg;
use common::{Never, YieldNow};
use proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead};
use crate::body::{Body, Payload};
use crate::body::internal::FullDataArg;
use crate::common::{Never, YieldNow};
use crate::proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead};
use super::Http1Transaction;
use service::Service;
use crate::service::Service;
pub(crate) struct Dispatcher<D, Bs: Payload, I, T> {
conn: Conn<I, Bs::Data, T>,
dispatch: D,
body_tx: Option<::body::Sender>,
body_tx: Option<crate::body::Sender>,
body_rx: Option<Bs>,
is_closing: bool,
/// If the poll loop reaches its max spin count, it will yield by notifying
@@ -30,7 +30,7 @@ pub(crate) trait Dispatch {
type PollError;
type RecvItem;
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError>;
fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>;
fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
fn poll_ready(&mut self) -> Poll<(), ()>;
fn should_poll(&self) -> bool;
}
@@ -41,11 +41,11 @@ pub struct Server<S: Service> {
}
pub struct Client<B> {
callback: Option<::client::dispatch::Callback<Request<B>, Response<Body>>>,
callback: Option<crate::client::dispatch::Callback<Request<B>, Response<Body>>>,
rx: ClientRx<B>,
}
type ClientRx<B> = ::client::dispatch::Receiver<Request<B>, Response<Body>>;
type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>;
impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
where
@@ -80,7 +80,7 @@ where
///
/// This is useful for old-style HTTP upgrades, but ignores
/// newer-style upgrade API.
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
pub fn poll_without_shutdown(&mut self) -> Poll<(), crate::Error> {
self.poll_catch(false)
.map(|x| {
x.map(|ds| if let Dispatched::Upgrade(pending) = ds {
@@ -89,7 +89,7 @@ where
})
}
fn poll_catch(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error> {
fn poll_catch(&mut self, should_shutdown: bool) -> Poll<Dispatched, crate::Error> {
self.poll_inner(should_shutdown).or_else(|e| {
// An error means we're shutting down either way.
// We just try to give the error to the user,
@@ -100,7 +100,7 @@ where
})
}
fn poll_inner(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error> {
fn poll_inner(&mut self, should_shutdown: bool) -> Poll<Dispatched, crate::Error> {
T::update_date();
try_ready!(self.poll_loop());
@@ -110,7 +110,7 @@ where
self.conn.take_error()?;
return Ok(Async::Ready(Dispatched::Upgrade(pending)));
} else if should_shutdown {
try_ready!(self.conn.shutdown().map_err(::Error::new_shutdown));
try_ready!(self.conn.shutdown().map_err(crate::Error::new_shutdown));
}
self.conn.take_error()?;
Ok(Async::Ready(Dispatched::Shutdown))
@@ -119,7 +119,7 @@ where
}
}
fn poll_loop(&mut self) -> Poll<(), ::Error> {
fn poll_loop(&mut self) -> Poll<(), crate::Error> {
// Limit the looping on this connection, in case it is ready far too
// often, so that other futures don't starve.
//
@@ -155,7 +155,7 @@ where
}
}
fn poll_read(&mut self) -> Poll<(), ::Error> {
fn poll_read(&mut self) -> Poll<(), crate::Error> {
loop {
if self.is_closing {
return Ok(Async::Ready(()));
@@ -199,7 +199,7 @@ where
return Ok(Async::NotReady);
}
Err(e) => {
body.send_error(::Error::new_body(e));
body.send_error(crate::Error::new_body(e));
}
}
} else {
@@ -211,7 +211,7 @@ where
}
}
fn poll_read_head(&mut self) -> Poll<(), ::Error> {
fn poll_read_head(&mut self) -> Poll<(), crate::Error> {
// can dispatch receive, or does it still care about, an incoming message?
match self.dispatch.poll_ready() {
Ok(Async::Ready(())) => (),
@@ -255,12 +255,12 @@ where
}
}
fn poll_write(&mut self) -> Poll<(), ::Error> {
fn poll_write(&mut self) -> Poll<(), crate::Error> {
loop {
if self.is_closing {
return Ok(Async::Ready(()));
} else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() {
if let Some((head, mut body)) = try_ready!(self.dispatch.poll_msg().map_err(::Error::new_user_service)) {
if let Some((head, mut body)) = try_ready!(self.dispatch.poll_msg().map_err(crate::Error::new_user_service)) {
// Check if the body knows its full data immediately.
//
// If so, we can skip a bit of bookkeeping that streaming
@@ -294,7 +294,7 @@ where
);
continue;
}
match body.poll_data().map_err(::Error::new_user_body)? {
match body.poll_data().map_err(crate::Error::new_user_body)? {
Async::Ready(Some(chunk)) => {
let eos = body.is_end_stream();
if eos {
@@ -327,10 +327,10 @@ where
}
}
fn poll_flush(&mut self) -> Poll<(), ::Error> {
fn poll_flush(&mut self) -> Poll<(), crate::Error> {
self.conn.flush().map_err(|err| {
debug!("error writing: {}", err);
::Error::new_body_write(err)
crate::Error::new_body_write(err)
})
}
@@ -367,7 +367,7 @@ where
Bs: Payload,
{
type Item = Dispatched;
type Error = ::Error;
type Error = crate::Error;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@@ -421,7 +421,7 @@ where
}
}
fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> {
fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
let (msg, body) = msg?;
let mut req = Request::new(body);
*req.method_mut() = msg.subject.0;
@@ -501,7 +501,7 @@ where
}
}
fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> {
fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
match msg {
Ok((msg, body)) => {
if let Some(cb) = self.callback.take() {
@@ -515,7 +515,7 @@ where
// Getting here is likely a bug! An error should have happened
// in Conn::require_empty_read() before ever parsing a
// full message!
Err(::Error::new_unexpected_message())
Err(crate::Error::new_unexpected_message())
}
},
Err(err) => {
@@ -526,7 +526,7 @@ where
trace!("canceling queued request with connection error: {}", err);
// in this case, the message was never even started, so it's safe to tell
// the user that the request was completely canceled
let _ = cb.send(Err((::Error::new_canceled().with(err), Some(req))));
let _ = cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
Ok(())
} else {
Err(err)
@@ -559,8 +559,8 @@ mod tests {
extern crate pretty_env_logger;
use super::*;
use mock::AsyncIo;
use proto::h1::ClientTransaction;
use crate::mock::AsyncIo;
use crate::proto::h1::ClientTransaction;
#[test]
fn client_read_bytes_before_writing_request() {
@@ -569,8 +569,8 @@ mod tests {
// Block at 0 for now, but we will release this response before
// the request is ready to write later...
let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0);
let (mut tx, rx) = ::client::dispatch::channel();
let conn = Conn::<_, ::Chunk, ClientTransaction>::new(io);
let (mut tx, rx) = crate::client::dispatch::channel();
let conn = Conn::<_, crate::Chunk, ClientTransaction>::new(io);
let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
// First poll is needed to allow tx to send...
@@ -578,7 +578,7 @@ mod tests {
// Unblock our IO, which has a response before we've sent request!
dispatcher.conn.io_mut().block_in(100);
let res_rx = tx.try_send(::Request::new(::Body::empty())).unwrap();
let res_rx = tx.try_send(crate::Request::new(crate::Body::empty())).unwrap();
let a1 = dispatcher.poll().expect("error should be sent on channel");
assert!(a1.is_ready(), "dispatcher should be closed");
@@ -587,7 +587,7 @@ mod tests {
.expect_err("callback response");
match (err.0.kind(), err.1) {
(&::error::Kind::Canceled, Some(_)) => (),
(&crate::error::Kind::Canceled, Some(_)) => (),
other => panic!("expected Canceled, got {:?}", other),
}
Ok::<(), ()>(())
@@ -599,16 +599,16 @@ mod tests {
let _ = pretty_env_logger::try_init();
::futures::lazy(|| {
let io = AsyncIo::new_buf(vec![], 0);
let (mut tx, rx) = ::client::dispatch::channel();
let conn = Conn::<_, ::Chunk, ClientTransaction>::new(io);
let (mut tx, rx) = crate::client::dispatch::channel();
let conn = Conn::<_, crate::Chunk, ClientTransaction>::new(io);
let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
// First poll is needed to allow tx to send...
assert!(dispatcher.poll().expect("nothing is ready").is_not_ready());
let body = ::Body::wrap_stream(::futures::stream::once(Ok::<_, ::Error>("")));
let body = crate::Body::wrap_stream(::futures::stream::once(Ok::<_, crate::Error>("")));
let _res_rx = tx.try_send(::Request::new(body)).unwrap();
let _res_rx = tx.try_send(crate::Request::new(body)).unwrap();
dispatcher.poll().expect("empty body shouldn't panic");
Ok::<(), ()>(())

View File

@@ -4,7 +4,7 @@ use bytes::{Buf, IntoBuf};
use bytes::buf::{Chain, Take};
use iovec::IoVec;
use common::StaticBuf;
use crate::common::StaticBuf;
use super::io::WriteBuf;
/// Encoders to handle different Transfer-Encodings.

View File

@@ -136,7 +136,7 @@ where
}
pub(super) fn parse<S>(&mut self, ctx: ParseContext)
-> Poll<ParsedMessage<S::Incoming>, ::Error>
-> Poll<ParsedMessage<S::Incoming>, crate::Error>
where
S: Http1Transaction,
{
@@ -153,14 +153,14 @@ where
let max = self.read_buf_strategy.max();
if self.read_buf.len() >= max {
debug!("max_buf_size ({}) reached, closing", max);
return Err(::Error::new_too_large());
return Err(crate::Error::new_too_large());
}
},
}
match try_ready!(self.read_from_io().map_err(::Error::new_io)) {
match try_ready!(self.read_from_io().map_err(crate::Error::new_io)) {
0 => {
trace!("parse eof");
return Err(::Error::new_incomplete());
return Err(crate::Error::new_incomplete());
}
_ => {},
}
@@ -651,13 +651,13 @@ impl<T: Buf> Buf for BufDeque<T> {
mod tests {
use super::*;
use std::io::Read;
use mock::AsyncIo;
use crate::mock::AsyncIo;
#[cfg(feature = "nightly")]
use test::Bencher;
#[cfg(test)]
impl<T: Read> MemRead for ::mock::AsyncIo<T> {
impl<T: Read> MemRead for crate::mock::AsyncIo<T> {
fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
let mut v = vec![0; len];
let n = try_nb!(self.read(v.as_mut_slice()));
@@ -689,7 +689,7 @@ mod tests {
cached_headers: &mut None,
req_method: &mut None,
};
assert!(buffered.parse::<::proto::h1::ClientTransaction>(ctx).unwrap().is_not_ready());
assert!(buffered.parse::<crate::proto::h1::ClientTransaction>(ctx).unwrap().is_not_ready());
assert!(buffered.io.blocked());
}
@@ -890,10 +890,10 @@ mod tests {
let s = "Hello, World!";
b.bytes = s.len() as u64;
let mut write_buf = WriteBuf::<::Chunk>::new();
let mut write_buf = WriteBuf::<crate::Chunk>::new();
write_buf.set_strategy(WriteStrategy::Flatten);
b.iter(|| {
let chunk = ::Chunk::from(s);
let chunk = crate::Chunk::from(s);
write_buf.buffer(chunk);
::test::black_box(&write_buf);
write_buf.headers.bytes.clear();

View File

@@ -1,7 +1,7 @@
use bytes::BytesMut;
use http::{HeaderMap, Method};
use proto::{MessageHead, BodyLength, DecodedLength};
use crate::proto::{MessageHead, BodyLength, DecodedLength};
pub(crate) use self::conn::Conn;
pub(crate) use self::dispatch::Dispatcher;
@@ -27,9 +27,9 @@ pub(crate) trait Http1Transaction {
type Outgoing: Default;
const LOG: &'static str;
fn parse(bytes: &mut BytesMut, ctx: ParseContext) -> ParseResult<Self::Incoming>;
fn encode(enc: Encode<Self::Outgoing>, dst: &mut Vec<u8>) -> ::Result<Encoder>;
fn encode(enc: Encode<Self::Outgoing>, dst: &mut Vec<u8>) -> crate::Result<Encoder>;
fn on_error(err: &::Error) -> Option<MessageHead<Self::Outgoing>>;
fn on_error(err: &crate::Error) -> Option<MessageHead<Self::Outgoing>>;
fn is_client() -> bool {
!Self::is_server()
@@ -51,7 +51,7 @@ pub(crate) trait Http1Transaction {
}
/// Result newtype for Http1Transaction::parse.
pub(crate) type ParseResult<T> = Result<Option<ParsedMessage<T>>, ::error::Parse>;
pub(crate) type ParseResult<T> = Result<Option<ParsedMessage<T>>, crate::error::Parse>;
#[derive(Debug)]
pub(crate) struct ParsedMessage<T> {

View File

@@ -10,10 +10,10 @@ use http::header::{self, Entry, HeaderName, HeaderValue};
use http::{HeaderMap, Method, StatusCode, Version};
use httparse;
use error::Parse;
use headers;
use proto::{BodyLength, DecodedLength, MessageHead, RequestLine, RequestHead};
use proto::h1::{Encode, Encoder, Http1Transaction, ParseResult, ParseContext, ParsedMessage, date};
use crate::error::Parse;
use crate::headers;
use crate::proto::{BodyLength, DecodedLength, MessageHead, RequestLine, RequestHead};
use crate::proto::h1::{Encode, Encoder, Http1Transaction, ParseResult, ParseContext, ParsedMessage, date};
const MAX_HEADERS: usize = 100;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
@@ -239,7 +239,7 @@ impl Http1Transaction for Server {
}))
}
fn encode(mut msg: Encode<Self::Outgoing>, mut dst: &mut Vec<u8>) -> ::Result<Encoder> {
fn encode(mut msg: Encode<Self::Outgoing>, mut dst: &mut Vec<u8>) -> crate::Result<Encoder> {
trace!(
"Server::encode status={:?}, body={:?}, req_method={:?}",
msg.head.subject,
@@ -266,7 +266,7 @@ impl Http1Transaction for Server {
*msg.head = MessageHead::default();
msg.head.subject = StatusCode::INTERNAL_SERVER_ERROR;
msg.body = None;
(Err(::Error::new_user_unsupported_status_code()), true)
(Err(crate::Error::new_user_unsupported_status_code()), true)
} else {
(Ok(()), !msg.keep_alive)
};
@@ -309,7 +309,7 @@ impl Http1Transaction for Server {
if wrote_len {
warn!("unexpected content-length found, canceling");
rewind(dst);
return Err(::Error::new_user_header());
return Err(crate::Error::new_user_header());
}
match msg.body {
Some(BodyLength::Known(known_len)) => {
@@ -369,7 +369,7 @@ impl Http1Transaction for Server {
if fold.0 != len {
warn!("multiple Content-Length values found: [{}, {}]", fold.0, len);
rewind(dst);
return Err(::Error::new_user_header());
return Err(crate::Error::new_user_header());
}
folded = Some(fold);
} else {
@@ -378,7 +378,7 @@ impl Http1Transaction for Server {
} else {
warn!("illegal Content-Length value: {:?}", value);
rewind(dst);
return Err(::Error::new_user_header());
return Err(crate::Error::new_user_header());
}
}
if let Some((len, value)) = folded {
@@ -418,7 +418,7 @@ impl Http1Transaction for Server {
if wrote_len {
warn!("unexpected transfer-encoding found, canceling");
rewind(dst);
return Err(::Error::new_user_header());
return Err(crate::Error::new_user_header());
}
// check that we actually can send a chunked body...
if msg.head.version == Version::HTTP_10 || !Server::can_chunked(msg.req_method, msg.head.subject) {
@@ -531,8 +531,8 @@ impl Http1Transaction for Server {
ret.map(|()| encoder.set_last(is_last))
}
fn on_error(err: &::Error) -> Option<MessageHead<Self::Outgoing>> {
use ::error::Kind;
fn on_error(err: &crate::Error) -> Option<MessageHead<Self::Outgoing>> {
use crate::error::Kind;
let status = match *err.kind() {
Kind::Parse(Parse::Method) |
Kind::Parse(Parse::Header) |
@@ -666,7 +666,7 @@ impl Http1Transaction for Client {
}
}
fn encode(msg: Encode<Self::Outgoing>, dst: &mut Vec<u8>) -> ::Result<Encoder> {
fn encode(msg: Encode<Self::Outgoing>, dst: &mut Vec<u8>) -> crate::Result<Encoder> {
trace!("Client::encode method={:?}, body={:?}", msg.head.subject.0, msg.body);
*msg.req_method = Some(msg.head.subject.0.clone());
@@ -704,7 +704,7 @@ impl Http1Transaction for Client {
Ok(body)
}
fn on_error(_err: &::Error) -> Option<MessageHead<Self::Outgoing>> {
fn on_error(_err: &crate::Error) -> Option<MessageHead<Self::Outgoing>> {
// we can't tell the server about any errors it creates
None
}
@@ -937,7 +937,7 @@ fn record_header_indices(
bytes: &[u8],
headers: &[httparse::Header],
indices: &mut [HeaderIndices]
) -> Result<(), ::error::Parse> {
) -> Result<(), crate::error::Parse> {
let bytes_ptr = bytes.as_ptr() as usize;
// FIXME: This should be a single plain `for` loop.
@@ -966,7 +966,7 @@ fn record_header_indices(
{
if header.name.len() >= (1 << 16) {
debug!("header name larger than 64kb: {:?}", header.name);
return Err(::error::Parse::TooLarge);
return Err(crate::error::Parse::TooLarge);
}
let name_start = header.name.as_ptr() as usize - bytes_ptr;
let name_end = name_start + header.name.len();
@@ -1071,12 +1071,12 @@ mod tests {
req_method: &mut method,
}).unwrap().unwrap();
assert_eq!(raw.len(), 0);
assert_eq!(msg.head.subject.0, ::Method::GET);
assert_eq!(msg.head.subject.0, crate::Method::GET);
assert_eq!(msg.head.subject.1, "/echo");
assert_eq!(msg.head.version, ::Version::HTTP_11);
assert_eq!(msg.head.version, crate::Version::HTTP_11);
assert_eq!(msg.head.headers.len(), 1);
assert_eq!(msg.head.headers["Host"], "hyper.rs");
assert_eq!(method, Some(::Method::GET));
assert_eq!(method, Some(crate::Method::GET));
}
@@ -1087,12 +1087,12 @@ mod tests {
let mut raw = BytesMut::from(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec());
let ctx = ParseContext {
cached_headers: &mut None,
req_method: &mut Some(::Method::GET),
req_method: &mut Some(crate::Method::GET),
};
let msg = Client::parse(&mut raw, ctx).unwrap().unwrap();
assert_eq!(raw.len(), 0);
assert_eq!(msg.head.subject, ::StatusCode::OK);
assert_eq!(msg.head.version, ::Version::HTTP_11);
assert_eq!(msg.head.subject, crate::StatusCode::OK);
assert_eq!(msg.head.version, crate::Version::HTTP_11);
assert_eq!(msg.head.headers.len(), 1);
assert_eq!(msg.head.headers["Content-Length"], "0");
}
@@ -1120,7 +1120,7 @@ mod tests {
.expect("parse complete")
}
fn parse_err(s: &str, comment: &str) -> ::error::Parse {
fn parse_err(s: &str, comment: &str) -> crate::error::Parse {
let mut bytes = BytesMut::from(s);
Server::parse(&mut bytes, ParseContext {
cached_headers: &mut None,
@@ -1266,7 +1266,7 @@ mod tests {
.expect("parse complete")
}
fn parse_err(s: &str) -> ::error::Parse {
fn parse_err(s: &str) -> crate::error::Parse {
let mut bytes = BytesMut::from(s);
Client::parse(&mut bytes, ParseContext {
cached_headers: &mut None,
@@ -1423,7 +1423,7 @@ mod tests {
#[test]
fn test_client_request_encode_title_case() {
use http::header::HeaderValue;
use proto::BodyLength;
use crate::proto::BodyLength;
let mut head = MessageHead::default();
head.headers.insert("content-length", HeaderValue::from_static("10"));
@@ -1553,7 +1553,7 @@ mod tests {
#[bench]
fn bench_server_encode_headers_preset(b: &mut Bencher) {
use http::header::HeaderValue;
use proto::BodyLength;
use crate::proto::BodyLength;
let len = 108;
b.bytes = len as u64;
@@ -1581,7 +1581,7 @@ mod tests {
#[cfg(feature = "nightly")]
#[bench]
fn bench_server_encode_no_headers(b: &mut Bencher) {
use proto::BodyLength;
use crate::proto::BodyLength;
let len = 76;
b.bytes = len as u64;

View File

@@ -5,15 +5,15 @@ use futures::sync::{mpsc, oneshot};
use h2::client::{Builder, Handshake, SendRequest};
use tokio_io::{AsyncRead, AsyncWrite};
use headers::content_length_parse_all;
use body::Payload;
use ::common::{Exec, Never};
use headers;
use ::proto::Dispatched;
use crate::headers::content_length_parse_all;
use crate::body::Payload;
use crate::common::{Exec, Never};
use crate::headers;
use crate::proto::Dispatched;
use super::{PipeToSendStream, SendBuf};
use ::{Body, Request, Response};
use crate::{Body, Request, Response};
type ClientRx<B> = ::client::dispatch::Receiver<Request<B>, Response<Body>>;
type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>;
/// An mpsc channel is used to help notify the `Connection` task when *all*
/// other handles to it have been dropped, so that it can shutdown.
type ConnDropRef = mpsc::Sender<Never>;
@@ -58,13 +58,13 @@ where
B: Payload + 'static,
{
type Item = Dispatched;
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let next = match self.state {
State::Handshaking(ref mut h) => {
let (request_tx, conn) = try_ready!(h.poll().map_err(::Error::new_h2));
let (request_tx, conn) = try_ready!(h.poll().map_err(crate::Error::new_h2));
// An mpsc channel is used entirely to detect when the
// 'Client' has been dropped. This is to get around a bug
// in h2 where dropping all SendRequests won't notify a
@@ -111,7 +111,7 @@ where
trace!("connection gracefully shutdown");
Ok(Async::Ready(Dispatched::Shutdown))
} else {
Err(::Error::new_h2(err))
Err(crate::Error::new_h2(err))
};
}
}
@@ -133,7 +133,7 @@ where
Ok(ok) => ok,
Err(err) => {
debug!("client send request error: {}", err);
cb.send(Err((::Error::new_h2(err), None)));
cb.send(Err((crate::Error::new_h2(err), None)));
continue;
}
};
@@ -162,12 +162,12 @@ where
Ok(res) => {
let content_length = content_length_parse_all(res.headers());
let res = res.map(|stream|
::Body::h2(stream, content_length));
crate::Body::h2(stream, content_length));
Ok(res)
},
Err(err) => {
debug!("client response error: {}", err);
Err((::Error::new_h2(err), None))
Err((crate::Error::new_h2(err), None))
}
}
});

View File

@@ -7,7 +7,7 @@ use http::header::{
};
use http::HeaderMap;
use body::Payload;
use crate::body::Payload;
mod client;
pub(crate) mod server;
@@ -91,18 +91,18 @@ where
}
}
fn on_user_err(&mut self, err: S::Error) -> ::Error {
let err = ::Error::new_user_body(err);
fn on_user_err(&mut self, err: S::Error) -> crate::Error {
let err = crate::Error::new_user_body(err);
debug!("send body user stream error: {}", err);
self.body_tx.send_reset(err.h2_reason());
err
}
fn send_eos_frame(&mut self) -> ::Result<()> {
fn send_eos_frame(&mut self) -> crate::Result<()> {
trace!("send body eos");
self.body_tx
.send_data(SendBuf(None), true)
.map_err(::Error::new_body_write)
.map_err(crate::Error::new_body_write)
}
}
@@ -111,7 +111,7 @@ where
S: Payload,
{
type Item = ();
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
@@ -123,18 +123,18 @@ where
if self.body_tx.capacity() == 0 {
loop {
match try_ready!(self.body_tx.poll_capacity().map_err(::Error::new_body_write)) {
match try_ready!(self.body_tx.poll_capacity().map_err(crate::Error::new_body_write)) {
Some(0) => {}
Some(_) => break,
None => return Err(::Error::new_canceled()),
None => return Err(crate::Error::new_canceled()),
}
}
} else {
if let Async::Ready(reason) =
self.body_tx.poll_reset().map_err(::Error::new_body_write)?
self.body_tx.poll_reset().map_err(crate::Error::new_body_write)?
{
debug!("stream received RST_STREAM: {:?}", reason);
return Err(::Error::new_body_write(::h2::Error::from(reason)));
return Err(crate::Error::new_body_write(::h2::Error::from(reason)));
}
}
@@ -150,7 +150,7 @@ where
let buf = SendBuf(Some(chunk));
self.body_tx
.send_data(buf, is_eos)
.map_err(::Error::new_body_write)?;
.map_err(crate::Error::new_body_write)?;
if is_eos {
return Ok(Async::Ready(()));
@@ -169,17 +169,17 @@ where
}
} else {
if let Async::Ready(reason) =
self.body_tx.poll_reset().map_err(|e| ::Error::new_body_write(e))?
self.body_tx.poll_reset().map_err(|e| crate::Error::new_body_write(e))?
{
debug!("stream received RST_STREAM: {:?}", reason);
return Err(::Error::new_body_write(::h2::Error::from(reason)));
return Err(crate::Error::new_body_write(::h2::Error::from(reason)));
}
match try_ready!(self.stream.poll_trailers().map_err(|e| self.on_user_err(e))) {
Some(trailers) => {
self.body_tx
.send_trailers(trailers)
.map_err(::Error::new_body_write)?;
.map_err(crate::Error::new_body_write)?;
return Ok(Async::Ready(()));
}
None => {

View File

@@ -5,16 +5,16 @@ use h2::Reason;
use h2::server::{Builder, Connection, Handshake, SendResponse};
use tokio_io::{AsyncRead, AsyncWrite};
use ::headers::content_length_parse_all;
use ::body::Payload;
use body::internal::FullDataArg;
use ::common::exec::H2Exec;
use ::headers;
use ::service::Service;
use ::proto::Dispatched;
use crate::headers::content_length_parse_all;
use crate::body::Payload;
use crate::body::internal::FullDataArg;
use crate::common::exec::H2Exec;
use crate::headers;
use crate::service::Service;
use crate::proto::Dispatched;
use super::{PipeToSendStream, SendBuf};
use ::{Body, Response};
use crate::{Body, Response};
pub(crate) struct Server<T, S, B, E>
where
@@ -40,7 +40,7 @@ where
B: Payload,
{
conn: Connection<T, SendBuf<B::Data>>,
closing: Option<::Error>,
closing: Option<crate::Error>,
}
@@ -90,13 +90,13 @@ where
E: H2Exec<S::Future, B>,
{
type Item = Dispatched;
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let next = match self.state {
State::Handshaking(ref mut h) => {
let conn = try_ready!(h.poll().map_err(::Error::new_h2));
let conn = try_ready!(h.poll().map_err(crate::Error::new_h2));
State::Serving(Serving {
conn,
closing: None,
@@ -122,7 +122,7 @@ where
T: AsyncRead + AsyncWrite,
B: Payload,
{
fn poll_server<S, E>(&mut self, service: &mut S, exec: &E) -> Poll<(), ::Error>
fn poll_server<S, E>(&mut self, service: &mut S, exec: &E) -> Poll<(), crate::Error>
where
S: Service<
ReqBody=Body,
@@ -138,12 +138,12 @@ where
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => {
// use `poll_close` instead of `poll`, in order to avoid accepting a request.
try_ready!(self.conn.poll_close().map_err(::Error::new_h2));
try_ready!(self.conn.poll_close().map_err(crate::Error::new_h2));
trace!("incoming connection complete");
return Ok(Async::Ready(()));
}
Err(err) => {
let err = ::Error::new_user_service(err);
let err = crate::Error::new_user_service(err);
debug!("service closed: {}", err);
let reason = err.h2_reason();
@@ -161,11 +161,11 @@ where
}
// When the service is ready, accepts an incoming request.
if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) {
if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(crate::Error::new_h2)) {
trace!("incoming request");
let content_length = content_length_parse_all(req.headers());
let req = req.map(|stream| {
::Body::h2(stream, content_length)
crate::Body::h2(stream, content_length)
});
let fut = H2Stream::new(service.call(req), respond);
exec.execute_h2stream(fut)?;
@@ -179,7 +179,7 @@ where
debug_assert!(self.closing.is_some(), "poll_server broke loop without closing");
try_ready!(self.conn.poll_close().map_err(::Error::new_h2));
try_ready!(self.conn.poll_close().map_err(crate::Error::new_h2));
Err(self.closing.take().expect("polled after error"))
}
@@ -215,7 +215,7 @@ where
}
}
fn poll2(&mut self) -> Poll<(), ::Error> {
fn poll2(&mut self) -> Poll<(), crate::Error> {
loop {
let next = match self.state {
H2StreamState::Service(ref mut h) => {
@@ -225,15 +225,15 @@ where
// Body is not yet ready, so we want to check if the client has sent a
// RST_STREAM frame which would cancel the current request.
if let Async::Ready(reason) =
self.reply.poll_reset().map_err(|e| ::Error::new_h2(e))?
self.reply.poll_reset().map_err(|e| crate::Error::new_h2(e))?
{
debug!("stream received RST_STREAM: {:?}", reason);
return Err(::Error::new_h2(reason.into()));
return Err(crate::Error::new_h2(reason.into()));
}
return Ok(Async::NotReady);
}
Err(e) => {
let err = ::Error::new_user_service(e);
let err = crate::Error::new_user_service(e);
warn!("http2 service errored: {}", err);
self.reply.send_reset(err.h2_reason());
return Err(err);
@@ -249,7 +249,7 @@ where
.headers_mut()
.entry(::http::header::DATE)
.expect("DATE is a valid HeaderName")
.or_insert_with(::proto::h1::date::update_and_header_value);
.or_insert_with(crate::proto::h1::date::update_and_header_value);
macro_rules! reply {
($eos:expr) => ({
@@ -258,7 +258,7 @@ where
Err(e) => {
debug!("send response error: {}", e);
self.reply.send_reset(Reason::INTERNAL_ERROR);
return Err(::Error::new_h2(e));
return Err(crate::Error::new_h2(e));
}
}
})
@@ -274,7 +274,7 @@ where
let buf = SendBuf(Some(full));
body_tx
.send_data(buf, true)
.map_err(::Error::new_body_write)?;
.map_err(crate::Error::new_body_write)?;
return Ok(Async::Ready(()));
}

View File

@@ -40,7 +40,7 @@ pub(crate) enum Dispatched {
/// Dispatcher completely shutdown connection.
Shutdown,
/// Dispatcher has pending upgrade, and so did not shutdown.
Upgrade(::upgrade::Pending),
Upgrade(crate::upgrade::Pending),
}
/// A separate module to encapsulate the invariants of the DecodedLength type.
@@ -83,12 +83,12 @@ mod body_length {
}
/// Checks the `u64` is within the maximum allowed for content-length.
pub(crate) fn checked_new(len: u64) -> Result<Self, ::error::Parse> {
pub(crate) fn checked_new(len: u64) -> Result<Self, crate::error::Parse> {
if len <= MAX_LEN {
Ok(DecodedLength(len))
} else {
warn!("content-length bigger than maximum: {} > {}", len, MAX_LEN);
Err(::error::Parse::TooLarge)
Err(crate::error::Parse::TooLarge)
}
}
}

View File

@@ -22,13 +22,13 @@ use h2;
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature = "runtime")] use tokio_reactor::Handle;
use body::{Body, Payload};
use common::exec::{Exec, H2Exec, NewSvcExec};
use common::io::Rewind;
use error::{Kind, Parse};
use proto;
use service::{MakeServiceRef, Service};
use upgrade::Upgraded;
use crate::body::{Body, Payload};
use crate::common::exec::{Exec, H2Exec, NewSvcExec};
use crate::common::io::Rewind;
use crate::error::{Kind, Parse};
use crate::proto;
use crate::service::{MakeServiceRef, Service};
use crate::upgrade::Upgraded;
pub(super) use self::spawn_all::NoopWatcher;
use self::spawn_all::NewSvcTask;
@@ -413,7 +413,7 @@ impl<E> Http<E> {
/// `make_service` object provided, creating a new service per
/// connection.
#[cfg(feature = "runtime")]
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, make_service: S) -> ::Result<Serve<AddrIncoming, S, E>>
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, make_service: S) -> crate::Result<Serve<AddrIncoming, S, E>>
where
S: MakeServiceRef<
AddrStream,
@@ -438,7 +438,7 @@ impl<E> Http<E> {
/// `make_service` object provided, creating a new service per
/// connection.
#[cfg(feature = "runtime")]
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, make_service: S) -> ::Result<Serve<AddrIncoming, S, E>>
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, make_service: S) -> crate::Result<Serve<AddrIncoming, S, E>>
where
S: MakeServiceRef<
AddrStream,
@@ -547,7 +547,7 @@ where
/// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
/// to work with this function; or use the `without_shutdown` wrapper.
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
pub fn poll_without_shutdown(&mut self) -> Poll<(), crate::Error> {
loop {
let polled = match *self.conn.as_mut().unwrap() {
Either::A(ref mut h1) => h1.poll_without_shutdown(),
@@ -570,9 +570,9 @@ where
/// Prevent shutdown of the underlying IO object at the end of service the request,
/// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
pub fn without_shutdown(self) -> impl Future<Item=Parts<I,S>, Error=::Error> {
pub fn without_shutdown(self) -> impl Future<Item=Parts<I,S>, Error=crate::Error> {
let mut conn = Some(self);
::futures::future::poll_fn(move || -> ::Result<_> {
::futures::future::poll_fn(move || -> crate::Result<_> {
try_ready!(conn.as_mut().unwrap().poll_without_shutdown());
Ok(conn.take().unwrap().into_parts().into())
})
@@ -629,7 +629,7 @@ where
E: H2Exec<S::Future, B>,
{
type Item = ();
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
@@ -701,7 +701,7 @@ where
E: H2Exec<<S::Service as Service>::Future, B>,
{
type Item = Connecting<I::Item, S::Future, E>;
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.make_service.poll_ready_ref() {
@@ -709,11 +709,11 @@ where
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
trace!("make_service closed");
return Err(::Error::new_user_make_service(e));
return Err(crate::Error::new_user_make_service(e));
}
}
if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) {
if let Some(io) = try_ready!(self.incoming.poll().map_err(crate::Error::new_accept)) {
let new_fut = self.make_service.make_service_ref(&io);
Ok(Async::Ready(Some(Connecting {
future: new_fut,
@@ -774,7 +774,7 @@ where
B: Payload,
E: H2Exec<<S::Service as Service>::Future, B>,
{
pub(super) fn poll_watch<W>(&mut self, watcher: &W) -> Poll<(), ::Error>
pub(super) fn poll_watch<W>(&mut self, watcher: &W) -> Poll<(), crate::Error>
where
E: NewSvcExec<I::Item, S::Future, S::Service, E, W>,
W: Watcher<I::Item, S::Service, E>,
@@ -795,9 +795,9 @@ pub(crate) mod spawn_all {
use futures::{Future, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use body::{Body, Payload};
use common::exec::H2Exec;
use service::Service;
use crate::body::{Body, Payload};
use crate::common::exec::H2Exec;
use crate::service::Service;
use super::{Connecting, UpgradeableConnection};
// Used by `SpawnAll` to optionally watch a `Connection` future.
@@ -809,7 +809,7 @@ pub(crate) mod spawn_all {
// connections, and signal that they start to shutdown when prompted, so
// it has a `GracefulWatcher` implementation to do that.
pub trait Watcher<I, S: Service, E>: Clone {
type Future: Future<Item=(), Error=::Error>;
type Future: Future<Item=(), Error=crate::Error>;
fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future;
}
@@ -878,7 +878,7 @@ pub(crate) mod spawn_all {
let conn = try_ready!(connecting
.poll()
.map_err(|err| {
let err = ::Error::new_user_make_service(err);
let err = crate::Error::new_user_make_service(err);
debug!("connecting error: {}", err);
}));
let connected = watcher.watch(conn.with_upgrades());
@@ -941,7 +941,7 @@ mod upgrades {
E: super::H2Exec<S::Future, B>,
{
type Item = ();
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {

View File

@@ -64,9 +64,9 @@ use futures::{Future, Stream, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature = "runtime")] use tokio_reactor;
use body::{Body, Payload};
use common::exec::{Exec, H2Exec, NewSvcExec};
use service::{MakeServiceRef, Service};
use crate::body::{Body, Payload};
use crate::common::exec::{Exec, H2Exec, NewSvcExec};
use crate::service::{MakeServiceRef, Service};
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
// error that `hyper::server::Http` is private...
use self::conn::{Http as Http_, NoopWatcher, SpawnAll};
@@ -119,13 +119,13 @@ impl Server<AddrIncoming, ()> {
}
/// Tries to bind to the provided address, and returns a [`Builder`](Builder).
pub fn try_bind(addr: &SocketAddr) -> ::Result<Builder<AddrIncoming>> {
pub fn try_bind(addr: &SocketAddr) -> crate::Result<Builder<AddrIncoming>> {
AddrIncoming::new(addr, None)
.map(Server::builder)
}
/// Create a new instance from a `std::net::TcpListener` instance.
pub fn from_tcp(listener: StdTcpListener) -> Result<Builder<AddrIncoming>, ::Error> {
pub fn from_tcp(listener: StdTcpListener) -> Result<Builder<AddrIncoming>, crate::Error> {
let handle = tokio_reactor::Handle::default();
AddrIncoming::from_std(listener, &handle)
.map(Server::builder)
@@ -212,7 +212,7 @@ where
E: NewSvcExec<I::Item, S::Future, S::Service, E, NoopWatcher>,
{
type Item = ();
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.spawn_all.poll_watch(&NoopWatcher)

View File

@@ -3,10 +3,10 @@ use std::error::Error as StdError;
use futures::{Async, Future, Stream, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use body::{Body, Payload};
use common::drain::{self, Draining, Signal, Watch, Watching};
use common::exec::{H2Exec, NewSvcExec};
use service::{MakeServiceRef, Service};
use crate::body::{Body, Payload};
use crate::common::drain::{self, Draining, Signal, Watch, Watching};
use crate::common::exec::{H2Exec, NewSvcExec};
use crate::service::{MakeServiceRef, Service};
use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
#[allow(missing_debug_implementations)]
@@ -51,7 +51,7 @@ where
E: NewSvcExec<I::Item, S::Future, S::Service, E, GracefulWatcher>,
{
type Item = ();
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {

View File

@@ -22,9 +22,9 @@ pub struct AddrIncoming {
}
impl AddrIncoming {
pub(super) fn new(addr: &SocketAddr, handle: Option<&Handle>) -> ::Result<Self> {
pub(super) fn new(addr: &SocketAddr, handle: Option<&Handle>) -> crate::Result<Self> {
let std_listener = StdTcpListener::bind(addr)
.map_err(::Error::new_listen)?;
.map_err(crate::Error::new_listen)?;
if let Some(handle) = handle {
AddrIncoming::from_std(std_listener, handle)
@@ -34,10 +34,10 @@ impl AddrIncoming {
}
}
pub(super) fn from_std(std_listener: StdTcpListener, handle: &Handle) -> ::Result<Self> {
pub(super) fn from_std(std_listener: StdTcpListener, handle: &Handle) -> crate::Result<Self> {
let listener = TcpListener::from_std(std_listener, &handle)
.map_err(::Error::new_listen)?;
let addr = listener.local_addr().map_err(::Error::new_listen)?;
.map_err(crate::Error::new_listen)?;
let addr = listener.local_addr().map_err(crate::Error::new_listen)?;
Ok(AddrIncoming {
listener,
addr: addr,
@@ -49,7 +49,7 @@ impl AddrIncoming {
}
/// Creates a new `AddrIncoming` binding to provided socket address.
pub fn bind(addr: &SocketAddr) -> ::Result<Self> {
pub fn bind(addr: &SocketAddr) -> crate::Result<Self> {
AddrIncoming::new(addr, None)
}

View File

@@ -3,7 +3,7 @@ use std::fmt;
use futures::{Async, Future, IntoFuture, Poll};
use body::Payload;
use crate::body::Payload;
use super::Service;
/// An asynchronous constructor of `Service`s.

View File

@@ -2,7 +2,7 @@ use std::error::Error as StdError;
use futures::{Async, Future, IntoFuture, Poll};
use body::Payload;
use crate::body::Payload;
use super::{MakeService, Service};
/// An asynchronous constructor of `Service`s.

View File

@@ -4,9 +4,9 @@ use std::marker::PhantomData;
use futures::{future, Async, Future, IntoFuture, Poll};
use body::Payload;
use common::Never;
use ::{Request, Response};
use crate::body::Payload;
use crate::common::Never;
use crate::{Request, Response};
/// An asynchronous function from `Request` to `Response`.
pub trait Service {
@@ -179,16 +179,16 @@ fn _assert_fn_mut() {
let mut val = 0;
let svc = service_fn(move |_req: Request<::Body>| {
let svc = service_fn(move |_req: Request<crate::Body>| {
val += 1;
future::ok::<_, Never>(Response::new(::Body::empty()))
future::ok::<_, Never>(Response::new(crate::Body::empty()))
});
assert_service(&svc);
let svc = service_fn_ok(move |_req: Request<::Body>| {
let svc = service_fn_ok(move |_req: Request<crate::Body>| {
val += 1;
Response::new(::Body::empty())
Response::new(crate::Body::empty())
});
assert_service(&svc);

View File

@@ -15,7 +15,7 @@ use futures::{Async, Future, Poll};
use futures::sync::oneshot;
use tokio_io::{AsyncRead, AsyncWrite};
use common::io::Rewind;
use crate::common::io::Rewind;
/// An upgraded HTTP connection.
///
@@ -33,7 +33,7 @@ pub struct Upgraded {
///
/// If no upgrade was available, or it doesn't succeed, yields an `Error`.
pub struct OnUpgrade {
rx: Option<oneshot::Receiver<::Result<Upgraded>>>,
rx: Option<oneshot::Receiver<crate::Result<Upgraded>>>,
}
/// The deconstructed parts of an [`Upgraded`](Upgraded) type.
@@ -57,7 +57,7 @@ pub struct Parts<T> {
}
pub(crate) struct Pending {
tx: oneshot::Sender<::Result<Upgraded>>
tx: oneshot::Sender<crate::Result<Upgraded>>
}
/// Error cause returned when an upgrade was expected but canceled
@@ -200,7 +200,7 @@ impl OnUpgrade {
impl Future for OnUpgrade {
type Item = Upgraded;
type Error = ::Error;
type Error = crate::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.rx {
@@ -209,10 +209,10 @@ impl Future for OnUpgrade {
Ok(Async::Ready(Ok(upgraded))) => Ok(Async::Ready(upgraded)),
Ok(Async::Ready(Err(err))) => Err(err),
Err(_oneshot_canceled) => Err(
::Error::new_canceled().with(UpgradeExpected(()))
crate::Error::new_canceled().with(UpgradeExpected(()))
),
},
None => Err(::Error::new_user_no_upgrade()),
None => Err(crate::Error::new_user_no_upgrade()),
}
}
}
@@ -236,7 +236,7 @@ impl Pending {
/// upgrades are handled manually.
pub(crate) fn manual(self) {
trace!("pending upgrade handled manually");
let _ = self.tx.send(Err(::Error::new_user_manual_upgrade()));
let _ = self.tx.send(Err(crate::Error::new_user_manual_upgrade()));
}
}

View File

@@ -5,9 +5,9 @@ pub extern crate tokio;
use std::sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}};
use std::time::Duration;
use hyper::{Body, Client, Request, Response, Server, Version};
use hyper::client::HttpConnector;
use hyper::service::service_fn;
use crate::hyper::{Body, Client, Request, Response, Server, Version};
use crate::hyper::client::HttpConnector;
use crate::hyper::service::service_fn;
pub use std::net::SocketAddr;
pub use self::futures::{future, Future, Stream};
@@ -207,12 +207,12 @@ macro_rules! __internal_headers_map {
macro_rules! __internal_headers_eq {
(@pat $name: expr, $pat:pat) => {
::std::sync::Arc::new(move |__hdrs: &::hyper::HeaderMap| {
::std::sync::Arc::new(move |__hdrs: &crate::hyper::HeaderMap| {
match __hdrs.get($name) {
$pat => (),
other => panic!("headers[{}] was not {}: {:?}", stringify!($name), stringify!($pat), other),
}
}) as ::std::sync::Arc<dyn Fn(&::hyper::HeaderMap) + Send + Sync>
}) as ::std::sync::Arc<dyn Fn(&crate::hyper::HeaderMap) + Send + Sync>
};
(@val $name: expr, NONE) => {
__internal_headers_eq!(@pat $name, None);
@@ -222,13 +222,13 @@ macro_rules! __internal_headers_eq {
};
(@val $name: expr, $val:expr) => ({
let __val = Option::from($val);
::std::sync::Arc::new(move |__hdrs: &::hyper::HeaderMap| {
::std::sync::Arc::new(move |__hdrs: &crate::hyper::HeaderMap| {
if let Some(ref val) = __val {
assert_eq!(__hdrs.get($name).expect(stringify!($name)), val.to_string().as_str(), stringify!($name));
} else {
assert_eq!(__hdrs.get($name), None, stringify!($name));
}
}) as ::std::sync::Arc<dyn Fn(&::hyper::HeaderMap) + Send + Sync>
}) as ::std::sync::Arc<dyn Fn(&crate::hyper::HeaderMap) + Send + Sync>
});
($headers:ident, { $($name:expr => $val:tt,)* }) => {
$(
@@ -378,7 +378,7 @@ pub fn __run_test(cfg: __TestConfig) {
.map_err(|never| -> hyper::Error { match never {} })
.flatten()
.map_err(|e| panic!("server connection error: {}", e));
::tokio::spawn(fut);
crate::tokio::spawn(fut);
Ok::<_, hyper::Error>(cnt)
})
.map(|_| ())