From da9b0319ef8d85662f66ac3bea74036b3dd3744e Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 9 Jul 2019 14:50:51 -0700 Subject: [PATCH] refactor(lib): update to 2018 edition --- .travis.yml | 2 +- Cargo.toml | 1 + benches/end_to_end.rs | 2 +- build.rs | 3 -- src/body/body.rs | 42 ++++++++++----------- src/body/chunk.rs | 2 +- src/body/payload.rs | 2 +- src/client/conn.rs | 40 ++++++++++---------- src/client/connect/mod.rs | 42 ++++++++++----------- src/client/dispatch.rs | 24 ++++++------ src/client/mod.rs | 40 ++++++++++---------- src/client/pool.rs | 18 ++++----- src/client/tests.rs | 14 +++---- src/common/exec.rs | 30 +++++++-------- src/error.rs | 40 +------------------- src/lib.rs | 8 ++-- src/mock.rs | 2 +- src/proto/h1/conn.rs | 50 ++++++++++++------------- src/proto/h1/decode.rs | 2 +- src/proto/h1/dispatch.rs | 74 ++++++++++++++++++------------------- src/proto/h1/encode.rs | 2 +- src/proto/h1/io.rs | 18 ++++----- src/proto/h1/mod.rs | 8 ++-- src/proto/h1/role.rs | 54 +++++++++++++-------------- src/proto/h2/client.rs | 26 ++++++------- src/proto/h2/mod.rs | 28 +++++++------- src/proto/h2/server.rs | 48 ++++++++++++------------ src/proto/mod.rs | 6 +-- src/server/conn.rs | 46 +++++++++++------------ src/server/mod.rs | 12 +++--- src/server/shutdown.rs | 10 ++--- src/server/tcp.rs | 12 +++--- src/service/make_service.rs | 2 +- src/service/new_service.rs | 2 +- src/service/service.rs | 14 +++---- src/upgrade.rs | 14 +++---- tests/support/mod.rs | 16 ++++---- 37 files changed, 358 insertions(+), 398 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6baedc9d..b35cbd3b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,7 +13,7 @@ matrix: - rust: stable env: FEATURES="--no-default-features" # Minimum Supported Rust Version - - rust: 1.27.0 + - rust: 1.31.0 env: FEATURES="--no-default-features --features runtime" BUILD_ONLY="1" before_script: diff --git a/Cargo.toml b/Cargo.toml index 4281c2da..14037742 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ license = "MIT" authors = ["Sean McArthur "] keywords = ["http", "hyper", "hyperium"] categories = ["network-programming", "web-programming::http-client", "web-programming::http-server"] +edition = "2018" publish = false diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 1901364d..734cb826 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -236,7 +236,7 @@ fn spawn_hello(rt: &mut Runtime, opts: &Opts) -> SocketAddr { let body = opts.response_body; let srv = Server::bind(&addr) - .http2_only(opts.http2); + .http2_only(opts.http2) .http2_initial_stream_window_size_(opts.http2_stream_window) .http2_initial_connection_window_size_(opts.http2_conn_window) .serve(move || { diff --git a/build.rs b/build.rs index aae89688..9eaf4a71 100644 --- a/build.rs +++ b/build.rs @@ -4,9 +4,6 @@ use rustc_version::{version, Version}; fn main() { let version = version().unwrap(); - if version >= Version::parse("1.30.0").unwrap() { - println!("cargo:rustc-cfg=error_source"); - } if version >= Version::parse("1.34.0").unwrap() { println!("cargo:rustc-cfg=try_from"); } diff --git a/src/body/body.rs b/src/body/body.rs index 00da58c5..14fba7c8 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -9,12 +9,12 @@ use tokio_buf::SizeHint; use h2; use http::HeaderMap; -use common::Never; +use crate::common::Never; use super::internal::{FullDataArg, FullDataRet}; use super::{Chunk, Payload}; -use upgrade::OnUpgrade; +use crate::upgrade::OnUpgrade; -type BodySender = mpsc::Sender>; +type BodySender = mpsc::Sender>; /// A stream of `Chunk`s, used when receiving bodies. /// @@ -34,7 +34,7 @@ enum Kind { Chan { content_length: Option, abort_rx: oneshot::Receiver<()>, - rx: mpsc::Receiver>, + rx: mpsc::Receiver>, }, H2 { content_length: Option, @@ -200,7 +200,7 @@ impl Body { })) } - fn poll_eof(&mut self) -> Poll, ::Error> { + fn poll_eof(&mut self) -> Poll, crate::Error> { match self.take_delayed_eof() { Some(DelayEof::NotEof(mut delay)) => { match self.poll_inner() { @@ -238,7 +238,7 @@ impl Body { } } - fn poll_inner(&mut self) -> Poll, ::Error> { + fn poll_inner(&mut self) -> Poll, crate::Error> { match self.kind { Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), Kind::Chan { @@ -247,7 +247,7 @@ impl Body { ref mut abort_rx, } => { if let Ok(Async::Ready(())) = abort_rx.poll() { - return Err(::Error::new_body_write("body write aborted")); + return Err(crate::Error::new_body_write("body write aborted")); } match rx.poll().expect("mpsc cannot error") { @@ -267,16 +267,16 @@ impl Body { recv: ref mut h2, .. } => h2 .poll() - .map(|async| { - async.map(|opt| { + .map(|r#async| { + r#async.map(|opt| { opt.map(|bytes| { let _ = h2.release_capacity().release_capacity(bytes.len()); Chunk::from(bytes) }) }) }) - .map_err(::Error::new_body), - Kind::Wrapped(ref mut s) => s.poll().map_err(::Error::new_body), + .map_err(crate::Error::new_body), + Kind::Wrapped(ref mut s) => s.poll().map_err(crate::Error::new_body), } } } @@ -291,7 +291,7 @@ impl Default for Body { impl Payload for Body { type Data = Chunk; - type Error = ::Error; + type Error = crate::Error; fn poll_data(&mut self) -> Poll, Self::Error> { self.poll_eof() @@ -301,7 +301,7 @@ impl Payload for Body { match self.kind { Kind::H2 { recv: ref mut h2, .. - } => h2.poll_trailers().map_err(::Error::new_h2), + } => h2.poll_trailers().map_err(crate::Error::new_h2), _ => Ok(Async::Ready(None)), } } @@ -336,7 +336,7 @@ impl Payload for Body { impl ::http_body::Body for Body { type Data = Chunk; - type Error = ::Error; + type Error = crate::Error; fn poll_data(&mut self) -> Poll, Self::Error> { ::poll_data(self) @@ -366,7 +366,7 @@ impl ::http_body::Body for Body { impl Stream for Body { type Item = Chunk; - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll, Self::Error> { self.poll_data() @@ -395,13 +395,13 @@ impl fmt::Debug for Body { impl Sender { /// Check to see if this `Sender` can send more data. - pub fn poll_ready(&mut self) -> Poll<(), ::Error> { + pub fn poll_ready(&mut self) -> Poll<(), crate::Error> { match self.abort_tx.poll_cancel() { - Ok(Async::Ready(())) | Err(_) => return Err(::Error::new_closed()), + Ok(Async::Ready(())) | Err(_) => return Err(crate::Error::new_closed()), Ok(Async::NotReady) => (), } - self.tx.poll_ready().map_err(|_| ::Error::new_closed()) + self.tx.poll_ready().map_err(|_| crate::Error::new_closed()) } /// Sends data on this channel. @@ -422,14 +422,14 @@ impl Sender { let _ = self.abort_tx.send(()); } - pub(crate) fn send_error(&mut self, err: ::Error) { + pub(crate) fn send_error(&mut self, err: crate::Error) { let _ = self.tx.try_send(Err(err)); } } impl Sink for Sender { type SinkItem = Chunk; - type SinkError = ::Error; + type SinkError = crate::Error; fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { Ok(Async::Ready(())) @@ -438,7 +438,7 @@ impl Sink for Sender { fn start_send(&mut self, msg: Chunk) -> StartSend { match self.poll_ready()? { Async::Ready(_) => { - self.send_data(msg).map_err(|_| ::Error::new_closed())?; + self.send_data(msg).map_err(|_| crate::Error::new_closed())?; Ok(AsyncSink::Ready) } Async::NotReady => Ok(AsyncSink::NotReady(msg)), diff --git a/src/body/chunk.rs b/src/body/chunk.rs index d29b00ca..c8c46989 100644 --- a/src/body/chunk.rs +++ b/src/body/chunk.rs @@ -176,7 +176,7 @@ mod tests { let mut dst = Vec::with_capacity(128); b.iter(|| { - let chunk = ::Chunk::from(s); + let chunk = crate::Chunk::from(s); dst.put(chunk); ::test::black_box(&dst); dst.clear(); diff --git a/src/body/payload.rs b/src/body/payload.rs index 0eaa6419..9e68caf4 100644 --- a/src/body/payload.rs +++ b/src/body/payload.rs @@ -65,7 +65,7 @@ pub trait Payload: Send + 'static { // The only thing a user *could* do is reference the method, but DON'T // DO THAT! :) #[doc(hidden)] - fn __hyper_full_data(&mut self, FullDataArg) -> FullDataRet { + fn __hyper_full_data(&mut self, _: FullDataArg) -> FullDataRet { FullDataRet(None) } } diff --git a/src/client/conn.rs b/src/client/conn.rs index b7a502e5..cdef789c 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -18,12 +18,12 @@ use futures::future::{self, Either, Executor}; use h2; use tokio_io::{AsyncRead, AsyncWrite}; -use body::Payload; -use common::Exec; -use upgrade::Upgraded; -use proto; +use crate::body::Payload; +use crate::common::Exec; +use crate::upgrade::Upgraded; +use crate::proto; use super::dispatch; -use {Body, Request, Response}; +use crate::{Body, Request, Response}; type Http1Dispatcher = proto::dispatch::Dispatcher< proto::dispatch::Client, @@ -39,7 +39,7 @@ type ConnEither = Either< /// Returns a `Handshake` future over some IO. /// /// This is a shortcut for `Builder::new().handshake(io)`. -pub fn handshake(io: T) -> Handshake +pub fn handshake(io: T) -> Handshake where T: AsyncRead + AsyncWrite + Send + 'static, { @@ -98,7 +98,7 @@ pub struct Handshake { pub struct ResponseFuture { // for now, a Box is used to hide away the internal `B` // that can be returned if canceled - inner: Box, Error=::Error> + Send>, + inner: Box, Error=crate::Error> + Send>, } /// Deconstructed parts of a `Connection`. @@ -145,7 +145,7 @@ impl SendRequest /// Polls to determine whether this sender can be used yet for a request. /// /// If the associated connection is closed, this returns an Error. - pub fn poll_ready(&mut self) -> Poll<(), ::Error> { + pub fn poll_ready(&mut self) -> Poll<(), crate::Error> { self.dispatch.poll_ready() } @@ -235,7 +235,7 @@ where }, Err(_req) => { debug!("connection was not ready"); - let err = ::Error::new_canceled().with("connection was not ready"); + let err = crate::Error::new_canceled().with("connection was not ready"); Either::B(future::err(err)) } }; @@ -245,7 +245,7 @@ where } } - pub(crate) fn send_request_retryable(&mut self, req: Request) -> impl Future, Error = (::Error, Option>)> + pub(crate) fn send_request_retryable(&mut self, req: Request) -> impl Future, Error = (crate::Error, Option>)> where B: Send, { @@ -262,7 +262,7 @@ where }, Err(req) => { debug!("connection was not ready"); - let err = ::Error::new_canceled().with("connection was not ready"); + let err = crate::Error::new_canceled().with("connection was not ready"); Either::B(future::err((err, Some(req)))) } } @@ -305,7 +305,7 @@ impl Http2SendRequest where B: Payload + 'static, { - pub(super) fn send_request_retryable(&mut self, req: Request) -> impl Future, Error=(::Error, Option>)> + pub(super) fn send_request_retryable(&mut self, req: Request) -> impl Future, Error=(crate::Error, Option>)> where B: Send, { @@ -322,7 +322,7 @@ where }, Err(req) => { debug!("connection was not ready"); - let err = ::Error::new_canceled().with("connection was not ready"); + let err = crate::Error::new_canceled().with("connection was not ready"); Either::B(future::err((err, Some(req)))) } } @@ -380,7 +380,7 @@ where /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html) /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html) /// to work with this function; or use the `without_shutdown` wrapper. - pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> { + pub fn poll_without_shutdown(&mut self) -> Poll<(), crate::Error> { match self.inner.as_mut().expect("already upgraded") { &mut Either::A(ref mut h1) => { h1.poll_without_shutdown() @@ -393,9 +393,9 @@ where /// Prevent shutdown of the underlying IO object at the end of service the request, /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`. - pub fn without_shutdown(self) -> impl Future, Error=::Error> { + pub fn without_shutdown(self) -> impl Future, Error=crate::Error> { let mut conn = Some(self); - ::futures::future::poll_fn(move || -> ::Result<_> { + ::futures::future::poll_fn(move || -> crate::Result<_> { try_ready!(conn.as_mut().unwrap().poll_without_shutdown()); Ok(conn.take().unwrap().into_parts().into()) }) @@ -408,7 +408,7 @@ where B: Payload + 'static, { type Item = (); - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { match try_ready!(self.inner.poll()) { @@ -552,7 +552,7 @@ where B: Payload + 'static, { type Item = (SendRequest, Connection); - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { let io = self.io.take().expect("polled more than once"); @@ -601,7 +601,7 @@ impl fmt::Debug for Handshake { impl Future for ResponseFuture { type Item = Response; - type Error = ::Error; + type Error = crate::Error; #[inline] fn poll(&mut self) -> Poll { @@ -620,7 +620,7 @@ impl fmt::Debug for ResponseFuture { impl Future for WhenReady { type Item = SendRequest; - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { let mut tx = self.tx.take().expect("polled after complete"); diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index 4d16843b..d62533fa 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -11,7 +11,7 @@ use std::{fmt, mem}; use bytes::{BufMut, Bytes, BytesMut}; use futures::Future; -use http::{uri, Response, Uri}; +use ::http::{uri, Response, Uri}; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "runtime")] pub mod dns; @@ -68,9 +68,9 @@ impl Destination { /// /// Returns an error if the uri contains no authority or /// no scheme. - pub fn try_from_uri(uri: Uri) -> ::Result { - uri.authority_part().ok_or(::error::Parse::Uri)?; - uri.scheme_part().ok_or(::error::Parse::Uri)?; + pub fn try_from_uri(uri: Uri) -> crate::Result { + uri.authority_part().ok_or(crate::error::Parse::Uri)?; + uri.scheme_part().ok_or(crate::error::Parse::Uri)?; Ok(Destination { uri }) } @@ -116,8 +116,8 @@ impl Destination { /// # Error /// /// Returns an error if the string is not a valid scheme. - pub fn set_scheme(&mut self, scheme: &str) -> ::Result<()> { - let scheme = scheme.parse().map_err(::error::Parse::from)?; + pub fn set_scheme(&mut self, scheme: &str) -> crate::Result<()> { + let scheme = scheme.parse().map_err(crate::error::Parse::from)?; self.update_uri(move |parts| { parts.scheme = Some(scheme); }) @@ -143,19 +143,19 @@ impl Destination { /// # Error /// /// Returns an error if the string is not a valid hostname. - pub fn set_host(&mut self, host: &str) -> ::Result<()> { + pub fn set_host(&mut self, host: &str) -> crate::Result<()> { // Prevent any userinfo setting, it's bad! if host.contains('@') { - return Err(::error::Parse::Uri.into()); + return Err(crate::error::Parse::Uri.into()); } let auth = if let Some(port) = self.port() { let bytes = Bytes::from(format!("{}:{}", host, port)); uri::Authority::from_shared(bytes) - .map_err(::error::Parse::from)? + .map_err(crate::error::Parse::from)? } else { - let auth = host.parse::().map_err(::error::Parse::from)?; + let auth = host.parse::().map_err(crate::error::Parse::from)?; if auth.port_part().is_some() { // std::uri::Authority::Uri - return Err(::error::Parse::Uri.into()); + return Err(crate::error::Parse::Uri.into()); } auth }; @@ -218,7 +218,7 @@ impl Destination { .expect("valid uri should be valid with port"); } - fn update_uri(&mut self, f: F) -> ::Result<()> + fn update_uri(&mut self, f: F) -> crate::Result<()> where F: FnOnce(&mut uri::Parts) { @@ -236,7 +236,7 @@ impl Destination { }, Err(err) => { self.uri = old_uri; - Err(::error::Parse::from(err).into()) + Err(crate::error::Parse::from(err).into()) }, } } @@ -254,7 +254,7 @@ impl Destination { #[cfg(try_from)] impl TryFrom for Destination { - type Error = ::error::Error; + type Error = crate::error::Error; fn try_from(uri: Uri) -> Result { Destination::try_from_uri(uri) @@ -325,7 +325,7 @@ impl Connected { // ===== impl Extra ===== impl Extra { - pub(super) fn set(&self, res: &mut Response<::Body>) { + pub(super) fn set(&self, res: &mut Response) { self.0.set(res); } } @@ -345,7 +345,7 @@ impl fmt::Debug for Extra { trait ExtraInner: Send + Sync { fn clone_box(&self) -> Box; - fn set(&self, res: &mut Response<::Body>); + fn set(&self, res: &mut Response); } // This indirection allows the `Connected` to have a type-erased "extra" value, @@ -362,7 +362,7 @@ where Box::new(self.clone()) } - fn set(&self, res: &mut Response<::Body>) { + fn set(&self, res: &mut Response) { res.extensions_mut().insert(self.0.clone()); } } @@ -383,7 +383,7 @@ where Box::new(self.clone()) } - fn set(&self, res: &mut Response<::Body>) { + fn set(&self, res: &mut Response) { self.0.set(res); res.extensions_mut().insert(self.1.clone()); } @@ -567,7 +567,7 @@ mod tests { let c1 = Connected::new() .extra(Ex1(41)); - let mut res1 = ::Response::new(::Body::empty()); + let mut res1 = crate::Response::new(crate::Body::empty()); assert_eq!(res1.extensions().get::(), None); @@ -590,7 +590,7 @@ mod tests { .extra(Ex2("zoom")) .extra(Ex3("pew pew")); - let mut res1 = ::Response::new(::Body::empty()); + let mut res1 = crate::Response::new(crate::Body::empty()); assert_eq!(res1.extensions().get::(), None); assert_eq!(res1.extensions().get::(), None); @@ -612,7 +612,7 @@ mod tests { .extra(Ex2("hiccup")) .extra(Ex1(99)); - let mut res2 = ::Response::new(::Body::empty()); + let mut res2 = crate::Response::new(crate::Body::empty()); c2 .extra diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 31541027..7aa0d1c8 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -2,10 +2,10 @@ use futures::{future, Async, Future, Poll, Stream}; use futures::sync::{mpsc, oneshot}; use want; -use common::Never; +use crate::common::Never; -pub type RetryPromise = oneshot::Receiver)>>; -pub type Promise = oneshot::Receiver>; +pub type RetryPromise = oneshot::Receiver)>>; +pub type Promise = oneshot::Receiver>; pub fn channel() -> (Sender, Receiver) { let (tx, rx) = mpsc::unbounded(); @@ -51,9 +51,9 @@ pub struct UnboundedSender { } impl Sender { - pub fn poll_ready(&mut self) -> Poll<(), ::Error> { + pub fn poll_ready(&mut self) -> Poll<(), crate::Error> { self.giver.poll_want() - .map_err(|_| ::Error::new_closed()) + .map_err(|_| crate::Error::new_closed()) } pub fn is_ready(&self) -> bool { @@ -167,14 +167,14 @@ struct Envelope(Option<(T, Callback)>); impl Drop for Envelope { fn drop(&mut self) { if let Some((val, cb)) = self.0.take() { - let _ = cb.send(Err((::Error::new_canceled().with("connection closed"), Some(val)))); + let _ = cb.send(Err((crate::Error::new_canceled().with("connection closed"), Some(val)))); } } } pub enum Callback { - Retry(oneshot::Sender)>>), - NoRetry(oneshot::Sender>), + Retry(oneshot::Sender)>>), + NoRetry(oneshot::Sender>), } impl Callback { @@ -192,7 +192,7 @@ impl Callback { } } - pub(crate) fn send(self, val: Result)>) { + pub(crate) fn send(self, val: Result)>) { match self { Callback::Retry(tx) => { let _ = tx.send(val); @@ -205,7 +205,7 @@ impl Callback { pub(crate) fn send_when( self, - mut when: impl Future)>, + mut when: impl Future)>, ) -> impl Future { let mut cb = Some(self); @@ -266,7 +266,7 @@ mod tests { .expect_err("promise should error"); match (err.0.kind(), err.1) { - (&::error::Kind::Canceled, Some(_)) => (), + (&crate::error::Kind::Canceled, Some(_)) => (), e => panic!("expected Error::Cancel(_), found {:?}", e), } @@ -312,7 +312,7 @@ mod tests { #[cfg(feature = "nightly")] #[bench] fn giver_queue_throughput(b: &mut test::Bencher) { - use {Body, Request, Response}; + use crate::{Body, Request, Response}; let (mut tx, mut rx) = super::channel::, Response>(); b.iter(move || { diff --git a/src/client/mod.rs b/src/client/mod.rs index 3ad159b2..ce497dac 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -89,8 +89,8 @@ use http::{Method, Request, Response, Uri, Version}; use http::header::{HeaderValue, HOST}; use http::uri::Scheme; -use body::{Body, Payload}; -use common::{lazy as hyper_lazy, Lazy}; +use crate::body::{Body, Payload}; +use crate::common::{lazy as hyper_lazy, Lazy}; use self::connect::{Alpn, Connect, Connected, Destination}; use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation}; @@ -237,7 +237,7 @@ where C: Connect + Sync + 'static, Version::HTTP_11 => (), Version::HTTP_10 => if is_http_connect { warn!("CONNECT is not allowed for HTTP/1.0"); - return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_request_method()))); + return ResponseFuture::new(Box::new(future::err(crate::Error::new_user_unsupported_request_method()))); }, other_h2 @ Version::HTTP_2 => if self.config.ver != Ver::Http2 { return ResponseFuture::error_version(other_h2); @@ -257,7 +257,7 @@ where C: Connect + Sync + 'static, ResponseFuture::new(Box::new(self.retryably_send_request(req, pool_key))) } - fn retryably_send_request(&self, req: Request, pool_key: PoolKey) -> impl Future, Error=::Error> { + fn retryably_send_request(&self, req: Request, pool_key: PoolKey) -> impl Future, Error=crate::Error> { let client = self.clone(); let uri = req.uri().clone(); @@ -320,7 +320,7 @@ where C: Connect + Sync + 'static, }; } else if req.method() == &Method::CONNECT { debug!("client does not support CONNECT requests over HTTP2"); - return Either::A(future::err(ClientError::Normal(::Error::new_user_unsupported_request_method()))); + return Either::A(future::err(ClientError::Normal(crate::Error::new_user_unsupported_request_method()))); } let fut = pooled.send_request_retryable(req) @@ -478,7 +478,7 @@ where C: Connect + Sync + 'static, } fn connect_to(&self, uri: Uri, pool_key: PoolKey) - -> impl Lazy>, Error=::Error> + -> impl Lazy>, Error=crate::Error> { let executor = self.conn_builder.exec.clone(); let pool = self.pool.clone(); @@ -498,12 +498,12 @@ where C: Connect + Sync + 'static, let connecting = match pool.connecting(&pool_key, ver) { Some(lock) => lock, None => { - let canceled = ::Error::new_canceled().with("HTTP/2 connection in progress"); + let canceled = crate::Error::new_canceled().with("HTTP/2 connection in progress"); return Either::B(future::err(canceled)); } }; Either::A(connector.connect(dst) - .map_err(::Error::new_connect) + .map_err(crate::Error::new_connect) .and_then(move |(io, connected)| { // If ALPN is h2 and we aren't http2_only already, // then we need to convert our pool checkout into @@ -517,7 +517,7 @@ where C: Connect + Sync + 'static, None => { // Another connection has already upgraded, // the pool checkout should finish up for us. - let canceled = ::Error::new_canceled().with("ALPN upgraded to HTTP/2"); + let canceled = crate::Error::new_canceled().with("ALPN upgraded to HTTP/2"); return Either::B(future::err(canceled)); } } @@ -583,11 +583,11 @@ impl fmt::Debug for Client { /// This is returned by `Client::request` (and `Client::get`). #[must_use = "futures do nothing unless polled"] pub struct ResponseFuture { - inner: Box, Error=::Error> + Send>, + inner: Box, Error=crate::Error> + Send>, } impl ResponseFuture { - fn new(fut: Box, Error=::Error> + Send>) -> Self { + fn new(fut: Box, Error=crate::Error> + Send>) -> Self { Self { inner: fut, } @@ -595,7 +595,7 @@ impl ResponseFuture { fn error_version(ver: Version) -> Self { warn!("Request has unsupported version \"{:?}\"", ver); - ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_version()))) + ResponseFuture::new(Box::new(future::err(crate::Error::new_user_unsupported_version()))) } } @@ -607,7 +607,7 @@ impl fmt::Debug for ResponseFuture { impl Future for ResponseFuture { type Item = Response; - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { self.inner.poll() @@ -627,7 +627,7 @@ enum PoolTx { } impl PoolClient { - fn poll_ready(&mut self) -> Poll<(), ::Error> { + fn poll_ready(&mut self) -> Poll<(), crate::Error> { match self.tx { PoolTx::Http1(ref mut tx) => tx.poll_ready(), PoolTx::Http2(_) => Ok(Async::Ready(())), @@ -661,7 +661,7 @@ impl PoolClient { } impl PoolClient { - fn send_request_retryable(&mut self, req: Request) -> impl Future, Error = (::Error, Option>)> + fn send_request_retryable(&mut self, req: Request) -> impl Future, Error = (crate::Error, Option>)> where B: Send, { @@ -713,17 +713,17 @@ where // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] enum ClientError { - Normal(::Error), + Normal(crate::Error), Canceled { connection_reused: bool, req: Request, - reason: ::Error, + reason: crate::Error, } } impl ClientError { fn map_with_reused(conn_reused: bool) - -> impl Fn((::Error, Option>)) -> Self + -> impl Fn((crate::Error, Option>)) -> Self { move |(err, orig_req)| { if let Some(req) = orig_req { @@ -797,7 +797,7 @@ fn authority_form(uri: &mut Uri) { }; } -fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> ::Result { +fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> crate::Result { let uri_clone = uri.clone(); match (uri_clone.scheme_part(), uri_clone.authority_part()) { (Some(scheme), Some(auth)) => { @@ -819,7 +819,7 @@ fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> ::Result { }, _ => { debug!("Client requires absolute-form URIs, received: {:?}", uri); - Err(::Error::new_user_absolute_uri_required()) + Err(crate::Error::new_user_absolute_uri_required()) } } } diff --git a/src/client/pool.rs b/src/client/pool.rs index bf7cefaa..9f18886e 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -9,7 +9,7 @@ use futures::sync::oneshot; #[cfg(feature = "runtime")] use tokio_timer::Interval; -use common::Exec; +use crate::common::Exec; use super::Ver; // FIXME: allow() required due to `impl Trait` leaking types to this lint @@ -75,7 +75,7 @@ struct PoolInner { // A oneshot channel is used to allow the interval to be notified when // the Pool completely drops. That way, the interval can cancel immediately. #[cfg(feature = "runtime")] - idle_interval_ref: Option>, + idle_interval_ref: Option>, #[cfg(feature = "runtime")] exec: Exec, timeout: Option, @@ -569,7 +569,7 @@ pub(super) struct Checkout { } impl Checkout { - fn poll_waiter(&mut self) -> Poll>, ::Error> { + fn poll_waiter(&mut self) -> Poll>, crate::Error> { static CANCELED: &str = "pool checkout failed"; if let Some(mut rx) = self.waiter.take() { match rx.poll() { @@ -577,14 +577,14 @@ impl Checkout { if value.is_open() { Ok(Async::Ready(Some(self.pool.reuse(&self.key, value)))) } else { - Err(::Error::new_canceled().with(CANCELED)) + Err(crate::Error::new_canceled().with(CANCELED)) } }, Ok(Async::NotReady) => { self.waiter = Some(rx); Ok(Async::NotReady) }, - Err(_canceled) => Err(::Error::new_canceled().with(CANCELED)), + Err(_canceled) => Err(crate::Error::new_canceled().with(CANCELED)), } } else { Ok(Async::Ready(None)) @@ -644,7 +644,7 @@ impl Checkout { impl Future for Checkout { type Item = Pooled; - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { if let Some(pooled) = try_ready!(self.poll_waiter()) { @@ -654,7 +654,7 @@ impl Future for Checkout { if let Some(pooled) = self.checkout() { Ok(Async::Ready(pooled)) } else if !self.pool.is_enabled() { - Err(::Error::new_canceled().with("pool is disabled")) + Err(crate::Error::new_canceled().with("pool is disabled")) } else { Ok(Async::NotReady) } @@ -723,7 +723,7 @@ struct IdleInterval { // This allows the IdleInterval to be notified as soon as the entire // Pool is fully dropped, and shutdown. This channel is never sent on, // but Err(Canceled) will be received when the Pool is dropped. - pool_drop_notifier: oneshot::Receiver<::common::Never>, + pool_drop_notifier: oneshot::Receiver, } #[cfg(feature = "runtime")] @@ -783,7 +783,7 @@ mod tests { use std::time::Duration; use futures::{Async, Future}; use futures::future; - use common::Exec; + use crate::common::Exec; use super::{Connecting, Key, Poolable, Pool, Reservation, WeakOpt}; /// Test unique reservations. diff --git a/src/client/tests.rs b/src/client/tests.rs index b55dc76d..5c45a3ec 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -6,7 +6,7 @@ use futures::future::poll_fn; use futures::sync::oneshot; use tokio::runtime::current_thread::Runtime; -use mock::MockConnector; +use crate::mock::MockConnector; use super::*; #[test] @@ -20,7 +20,7 @@ fn retryable_request() { let sock2 = connector.mock("http://mock.local"); let client = Client::builder() - .build::<_, ::Body>(connector); + .build::<_, crate::Body>(connector); client.pool.no_timer(); @@ -67,7 +67,7 @@ fn conn_reset_after_write() { let sock1 = connector.mock("http://mock.local"); let client = Client::builder() - .build::<_, ::Body>(connector); + .build::<_, crate::Body>(connector); client.pool.no_timer(); @@ -119,11 +119,11 @@ fn checkout_win_allows_connect_future_to_be_pooled() { let sock2 = connector.mock_fut("http://mock.local", rx); let client = Client::builder() - .build::<_, ::Body>(connector); + .build::<_, crate::Body>(connector); client.pool.no_timer(); - let uri = "http://mock.local/a".parse::<::Uri>().expect("uri parse"); + let uri = "http://mock.local/a".parse::().expect("uri parse"); // First request just sets us up to have a connection able to be put // back in the pool. *However*, it doesn't insert immediately. The @@ -214,7 +214,7 @@ fn bench_http1_get_0b(b: &mut test::Bencher) { let client = Client::builder() - .build::<_, ::Body>(connector.clone()); + .build::<_, crate::Body>(connector.clone()); client.pool.no_timer(); @@ -246,7 +246,7 @@ fn bench_http1_get_10b(b: &mut test::Bencher) { let client = Client::builder() - .build::<_, ::Body>(connector.clone()); + .build::<_, crate::Body>(connector.clone()); client.pool.no_timer(); diff --git a/src/common/exec.rs b/src/common/exec.rs index 4c2d489b..5b5b8727 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -3,17 +3,17 @@ use std::sync::Arc; use futures::future::{Executor, Future}; -use body::Payload; -use proto::h2::server::H2Stream; -use server::conn::spawn_all::{NewSvcTask, Watcher}; -use service::Service; +use crate::body::Payload; +use crate::proto::h2::server::H2Stream; +use crate::server::conn::spawn_all::{NewSvcTask, Watcher}; +use crate::service::Service; pub trait H2Exec: Clone { - fn execute_h2stream(&self, fut: H2Stream) -> ::Result<()>; + fn execute_h2stream(&self, fut: H2Stream) -> crate::Result<()>; } pub trait NewSvcExec>: Clone { - fn execute_new_svc(&self, fut: NewSvcTask) -> ::Result<()>; + fn execute_new_svc(&self, fut: NewSvcTask) -> crate::Result<()>; } // Either the user provides an executor for background tasks, or we use @@ -27,7 +27,7 @@ pub enum Exec { // ===== impl Exec ===== impl Exec { - pub(crate) fn execute(&self, fut: F) -> ::Result<()> + pub(crate) fn execute(&self, fut: F) -> crate::Result<()> where F: Future + Send + 'static, { @@ -62,7 +62,7 @@ impl Exec { .spawn(Box::new(fut)) .map_err(|err| { warn!("executor error: {:?}", err); - ::Error::new_execute(TokioSpawnError) + crate::Error::new_execute(TokioSpawnError) }) } #[cfg(not(feature = "runtime"))] @@ -75,7 +75,7 @@ impl Exec { e.execute(Box::new(fut)) .map_err(|err| { warn!("executor error: {:?}", err.kind()); - ::Error::new_execute("custom executor failed") + crate::Error::new_execute("custom executor failed") }) }, } @@ -95,7 +95,7 @@ where H2Stream: Future + Send + 'static, B: Payload, { - fn execute_h2stream(&self, fut: H2Stream) -> ::Result<()> { + fn execute_h2stream(&self, fut: H2Stream) -> crate::Result<()> { self.execute(fut) } } @@ -106,7 +106,7 @@ where S: Service, W: Watcher, { - fn execute_new_svc(&self, fut: NewSvcTask) -> ::Result<()> { + fn execute_new_svc(&self, fut: NewSvcTask) -> crate::Result<()> { self.execute(fut) } } @@ -119,11 +119,11 @@ where H2Stream: Future, B: Payload, { - fn execute_h2stream(&self, fut: H2Stream) -> ::Result<()> { + fn execute_h2stream(&self, fut: H2Stream) -> crate::Result<()> { self.execute(fut) .map_err(|err| { warn!("executor error: {:?}", err.kind()); - ::Error::new_execute("custom executor failed") + crate::Error::new_execute("custom executor failed") }) } } @@ -135,11 +135,11 @@ where S: Service, W: Watcher, { - fn execute_new_svc(&self, fut: NewSvcTask) -> ::Result<()> { + fn execute_new_svc(&self, fut: NewSvcTask) -> crate::Result<()> { self.execute(fut) .map_err(|err| { warn!("executor error: {:?}", err.kind()); - ::Error::new_execute("custom executor failed") + crate::Error::new_execute("custom executor failed") }) } } diff --git a/src/error.rs b/src/error.rs index 251d8671..1a025c5e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -133,14 +133,8 @@ impl Error { self.inner.kind == Kind::IncompleteMessage } - #[doc(hidden)] - #[cfg_attr(error_source, deprecated(note = "use Error::source instead"))] - pub fn cause2(&self) -> Option<&(dyn StdError + 'static + Sync + Send)> { - self.inner.cause.as_ref().map(|e| &**e) - } - /// Consumes the error, returning its cause. - pub fn into_cause(self) -> Option> { + pub fn into_cause(self) -> Option> { self.inner.cause } @@ -162,28 +156,6 @@ impl Error { &self.inner.kind } - #[cfg(not(error_source))] - pub(crate) fn h2_reason(&self) -> h2::Reason { - // Since we don't have access to `Error::source`, we can only - // look so far... - let mut cause = self.cause2(); - while let Some(err) = cause { - if let Some(h2_err) = err.downcast_ref::() { - return h2_err - .reason() - .unwrap_or(h2::Reason::INTERNAL_ERROR); - } - - cause = err - .downcast_ref::() - .and_then(Error::cause2); - } - - // else - h2::Reason::INTERNAL_ERROR - } - - #[cfg(error_source)] pub(crate) fn h2_reason(&self) -> h2::Reason { // Find an h2::Reason somewhere in the cause stack, if it exists, // otherwise assume an INTERNAL_ERROR. @@ -370,16 +342,6 @@ impl StdError for Error { } } - #[cfg(not(error_source))] - fn cause(&self) -> Option<&StdError> { - self - .inner - .cause - .as_ref() - .map(|cause| &**cause as &StdError) - } - - #[cfg(error_source)] fn source(&self) -> Option<&(dyn StdError + 'static)> { self .inner diff --git a/src/lib.rs b/src/lib.rs index f987fb5e..0a645389 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,10 +53,10 @@ pub use http::{ Version, }; -pub use client::Client; -pub use error::{Result, Error}; -pub use body::{Body, Chunk}; -pub use server::Server; +pub use crate::client::Client; +pub use crate::error::{Result, Error}; +pub use crate::body::{Body, Chunk}; +pub use crate::server::Server; #[macro_use] mod common; diff --git a/src/mock.rs b/src/mock.rs index 8c3dad3f..27526932 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -13,7 +13,7 @@ use futures::task::{self, Task}; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "runtime")] -use ::client::connect::{Connect, Connected, Destination}; +use crate::client::connect::{Connect, Connected, Destination}; #[derive(Debug)] pub struct MockCursor { diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 721ed9d0..6079f1f3 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -8,9 +8,9 @@ use http::{HeaderMap, Method, Version}; use http::header::{HeaderValue, CONNECTION}; use tokio_io::{AsyncRead, AsyncWrite}; -use ::Chunk; -use proto::{BodyLength, DecodedLength, MessageHead}; -use headers::connection_keep_alive; +use crate::Chunk; +use crate::proto::{BodyLength, DecodedLength, MessageHead}; +use crate::headers::connection_keep_alive; use super::io::{Buffered}; use super::{EncodedBuf, Encode, Encoder, /*Decode,*/ Decoder, Http1Transaction, ParseContext}; @@ -84,7 +84,7 @@ where I: AsyncRead + AsyncWrite, self.io.into_inner() } - pub fn pending_upgrade(&mut self) -> Option<::upgrade::Pending> { + pub fn pending_upgrade(&mut self) -> Option { self.state.upgrade.take() } @@ -129,7 +129,7 @@ where I: AsyncRead + AsyncWrite, read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE } - pub fn read_head(&mut self) -> Poll, DecodedLength, bool)>, ::Error> { + pub fn read_head(&mut self) -> Poll, DecodedLength, bool)>, crate::Error> { debug_assert!(self.can_read_head()); trace!("Conn::read_head"); @@ -168,7 +168,7 @@ where I: AsyncRead + AsyncWrite, Ok(Async::Ready(Some((msg.head, msg.decode, msg.wants_upgrade)))) } - fn on_read_head_error(&mut self, e: ::Error) -> Poll, ::Error> { + fn on_read_head_error(&mut self, e: crate::Error) -> Poll, crate::Error> { // If we are currently waiting on a message, then an empty // message should be reported as an error. If not, it is just // the connection closing gracefully. @@ -233,7 +233,7 @@ where I: AsyncRead + AsyncWrite, ret } - pub fn read_keep_alive(&mut self) -> Poll<(), ::Error> { + pub fn read_keep_alive(&mut self) -> Poll<(), crate::Error> { debug_assert!(!self.can_read_head() && !self.can_read_body()); if self.is_mid_message() { @@ -254,22 +254,22 @@ where I: AsyncRead + AsyncWrite, // // This should only be called for Clients wanting to enter the idle // state. - fn require_empty_read(&mut self) -> Poll<(), ::Error> { + fn require_empty_read(&mut self) -> Poll<(), crate::Error> { debug_assert!(!self.can_read_head() && !self.can_read_body()); debug_assert!(!self.is_mid_message()); debug_assert!(T::is_client()); if !self.io.read_buf().is_empty() { debug!("received an unexpected {} bytes", self.io.read_buf().len()); - return Err(::Error::new_unexpected_message()); + return Err(crate::Error::new_unexpected_message()); } - let num_read = try_ready!(self.force_io_read().map_err(::Error::new_io)); + let num_read = try_ready!(self.force_io_read().map_err(crate::Error::new_io)); if num_read == 0 { let ret = if self.should_error_on_eof() { trace!("found unexpected EOF on busy connection: {:?}", self.state); - Err(::Error::new_incomplete()) + Err(crate::Error::new_incomplete()) } else { trace!("found EOF on idle connection, closing"); Ok(Async::Ready(())) @@ -281,10 +281,10 @@ where I: AsyncRead + AsyncWrite, } debug!("received unexpected {} bytes on an idle connection", num_read); - Err(::Error::new_unexpected_message()) + Err(crate::Error::new_unexpected_message()) } - fn mid_message_detect_eof(&mut self) -> Poll<(), ::Error> { + fn mid_message_detect_eof(&mut self) -> Poll<(), crate::Error> { debug_assert!(!self.can_read_head() && !self.can_read_body()); debug_assert!(self.is_mid_message()); @@ -292,12 +292,12 @@ where I: AsyncRead + AsyncWrite, return Ok(Async::NotReady); } - let num_read = try_ready!(self.force_io_read().map_err(::Error::new_io)); + let num_read = try_ready!(self.force_io_read().map_err(crate::Error::new_io)); if num_read == 0 { trace!("found unexpected EOF on busy connection: {:?}", self.state); self.state.close_read(); - Err(::Error::new_incomplete()) + Err(crate::Error::new_incomplete()) } else { Ok(Async::Ready(())) } @@ -563,12 +563,12 @@ where I: AsyncRead + AsyncWrite, // // - Client: there is nothing we can do // - Server: if Response hasn't been written yet, we can send a 4xx response - fn on_parse_error(&mut self, err: ::Error) -> ::Result<()> { + fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> { match self.state.writing { Writing::Init => { if self.has_h2_prefix() { - return Err(::Error::new_version_h2()) + return Err(crate::Error::new_version_h2()) } if let Some(msg) = T::on_error(&err) { // Drop the cached headers so as to not trigger a debug @@ -623,7 +623,7 @@ where I: AsyncRead + AsyncWrite, } } - pub fn take_error(&mut self) -> ::Result<()> { + pub fn take_error(&mut self) -> crate::Result<()> { if let Some(err) = self.state.error.take() { Err(err) } else { @@ -631,7 +631,7 @@ where I: AsyncRead + AsyncWrite, } } - pub(super) fn on_upgrade(&mut self) -> ::upgrade::OnUpgrade { + pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade { trace!("{}: prepare possible HTTP upgrade", T::LOG); self.state.prepare_upgrade() } @@ -658,7 +658,7 @@ struct State { cached_headers: Option, /// If an error occurs when there wasn't a direct way to return it /// back to the user, this is set. - error: Option<::Error>, + error: Option, /// Current keep-alive status. keep_alive: KA, /// If mid-message, the HTTP Method that started it. @@ -675,7 +675,7 @@ struct State { /// State of allowed writes writing: Writing, /// An expected pending HTTP upgrade. - upgrade: Option<::upgrade::Pending>, + upgrade: Option, /// Either HTTP/1.0 or 1.1 connection version: Version, } @@ -868,9 +868,9 @@ impl State { } } - fn prepare_upgrade(&mut self) -> ::upgrade::OnUpgrade { + fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade { debug_assert!(self.upgrade.is_none()); - let (tx, rx) = ::upgrade::pending(); + let (tx, rx) = crate::upgrade::pending(); self.upgrade = Some(tx); rx } @@ -888,9 +888,9 @@ mod tests { let len = s.len(); b.bytes = len as u64; - let mut io = ::mock::AsyncIo::new_buf(Vec::new(), 0); + let mut io = crate::mock::AsyncIo::new_buf(Vec::new(), 0); io.panic(); - let mut conn = Conn::<_, ::Chunk, ::proto::h1::ServerTransaction>::new(io); + let mut conn = Conn::<_, crate::Chunk, crate::proto::h1::ServerTransaction>::new(io); *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]); conn.state.cached_headers = Some(HeaderMap::with_capacity(2)); diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index c89ffcdc..e783685d 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -323,7 +323,7 @@ mod tests { use super::super::io::MemRead; use futures::{Async, Poll}; use bytes::{BytesMut, Bytes}; - use mock::AsyncIo; + use crate::mock::AsyncIo; impl<'a> MemRead for &'a [u8] { fn read_mem(&mut self, len: usize) -> Poll { diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 69a06de1..c896ea44 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -5,17 +5,17 @@ use futures::{Async, Future, Poll, Stream}; use http::{Request, Response, StatusCode}; use tokio_io::{AsyncRead, AsyncWrite}; -use body::{Body, Payload}; -use body::internal::FullDataArg; -use common::{Never, YieldNow}; -use proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead}; +use crate::body::{Body, Payload}; +use crate::body::internal::FullDataArg; +use crate::common::{Never, YieldNow}; +use crate::proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead}; use super::Http1Transaction; -use service::Service; +use crate::service::Service; pub(crate) struct Dispatcher { conn: Conn, dispatch: D, - body_tx: Option<::body::Sender>, + body_tx: Option, body_rx: Option, is_closing: bool, /// If the poll loop reaches its max spin count, it will yield by notifying @@ -30,7 +30,7 @@ pub(crate) trait Dispatch { type PollError; type RecvItem; fn poll_msg(&mut self) -> Poll, Self::PollError>; - fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>; + fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>; fn poll_ready(&mut self) -> Poll<(), ()>; fn should_poll(&self) -> bool; } @@ -41,11 +41,11 @@ pub struct Server { } pub struct Client { - callback: Option<::client::dispatch::Callback, Response>>, + callback: Option, Response>>, rx: ClientRx, } -type ClientRx = ::client::dispatch::Receiver, Response>; +type ClientRx = crate::client::dispatch::Receiver, Response>; impl Dispatcher where @@ -80,7 +80,7 @@ where /// /// This is useful for old-style HTTP upgrades, but ignores /// newer-style upgrade API. - pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> { + pub fn poll_without_shutdown(&mut self) -> Poll<(), crate::Error> { self.poll_catch(false) .map(|x| { x.map(|ds| if let Dispatched::Upgrade(pending) = ds { @@ -89,7 +89,7 @@ where }) } - fn poll_catch(&mut self, should_shutdown: bool) -> Poll { + fn poll_catch(&mut self, should_shutdown: bool) -> Poll { self.poll_inner(should_shutdown).or_else(|e| { // An error means we're shutting down either way. // We just try to give the error to the user, @@ -100,7 +100,7 @@ where }) } - fn poll_inner(&mut self, should_shutdown: bool) -> Poll { + fn poll_inner(&mut self, should_shutdown: bool) -> Poll { T::update_date(); try_ready!(self.poll_loop()); @@ -110,7 +110,7 @@ where self.conn.take_error()?; return Ok(Async::Ready(Dispatched::Upgrade(pending))); } else if should_shutdown { - try_ready!(self.conn.shutdown().map_err(::Error::new_shutdown)); + try_ready!(self.conn.shutdown().map_err(crate::Error::new_shutdown)); } self.conn.take_error()?; Ok(Async::Ready(Dispatched::Shutdown)) @@ -119,7 +119,7 @@ where } } - fn poll_loop(&mut self) -> Poll<(), ::Error> { + fn poll_loop(&mut self) -> Poll<(), crate::Error> { // Limit the looping on this connection, in case it is ready far too // often, so that other futures don't starve. // @@ -155,7 +155,7 @@ where } } - fn poll_read(&mut self) -> Poll<(), ::Error> { + fn poll_read(&mut self) -> Poll<(), crate::Error> { loop { if self.is_closing { return Ok(Async::Ready(())); @@ -199,7 +199,7 @@ where return Ok(Async::NotReady); } Err(e) => { - body.send_error(::Error::new_body(e)); + body.send_error(crate::Error::new_body(e)); } } } else { @@ -211,7 +211,7 @@ where } } - fn poll_read_head(&mut self) -> Poll<(), ::Error> { + fn poll_read_head(&mut self) -> Poll<(), crate::Error> { // can dispatch receive, or does it still care about, an incoming message? match self.dispatch.poll_ready() { Ok(Async::Ready(())) => (), @@ -255,12 +255,12 @@ where } } - fn poll_write(&mut self) -> Poll<(), ::Error> { + fn poll_write(&mut self) -> Poll<(), crate::Error> { loop { if self.is_closing { return Ok(Async::Ready(())); } else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() { - if let Some((head, mut body)) = try_ready!(self.dispatch.poll_msg().map_err(::Error::new_user_service)) { + if let Some((head, mut body)) = try_ready!(self.dispatch.poll_msg().map_err(crate::Error::new_user_service)) { // Check if the body knows its full data immediately. // // If so, we can skip a bit of bookkeeping that streaming @@ -294,7 +294,7 @@ where ); continue; } - match body.poll_data().map_err(::Error::new_user_body)? { + match body.poll_data().map_err(crate::Error::new_user_body)? { Async::Ready(Some(chunk)) => { let eos = body.is_end_stream(); if eos { @@ -327,10 +327,10 @@ where } } - fn poll_flush(&mut self) -> Poll<(), ::Error> { + fn poll_flush(&mut self) -> Poll<(), crate::Error> { self.conn.flush().map_err(|err| { debug!("error writing: {}", err); - ::Error::new_body_write(err) + crate::Error::new_body_write(err) }) } @@ -367,7 +367,7 @@ where Bs: Payload, { type Item = Dispatched; - type Error = ::Error; + type Error = crate::Error; #[inline] fn poll(&mut self) -> Poll { @@ -421,7 +421,7 @@ where } } - fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> { + fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> { let (msg, body) = msg?; let mut req = Request::new(body); *req.method_mut() = msg.subject.0; @@ -501,7 +501,7 @@ where } } - fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> { + fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> { match msg { Ok((msg, body)) => { if let Some(cb) = self.callback.take() { @@ -515,7 +515,7 @@ where // Getting here is likely a bug! An error should have happened // in Conn::require_empty_read() before ever parsing a // full message! - Err(::Error::new_unexpected_message()) + Err(crate::Error::new_unexpected_message()) } }, Err(err) => { @@ -526,7 +526,7 @@ where trace!("canceling queued request with connection error: {}", err); // in this case, the message was never even started, so it's safe to tell // the user that the request was completely canceled - let _ = cb.send(Err((::Error::new_canceled().with(err), Some(req)))); + let _ = cb.send(Err((crate::Error::new_canceled().with(err), Some(req)))); Ok(()) } else { Err(err) @@ -559,8 +559,8 @@ mod tests { extern crate pretty_env_logger; use super::*; - use mock::AsyncIo; - use proto::h1::ClientTransaction; + use crate::mock::AsyncIo; + use crate::proto::h1::ClientTransaction; #[test] fn client_read_bytes_before_writing_request() { @@ -569,8 +569,8 @@ mod tests { // Block at 0 for now, but we will release this response before // the request is ready to write later... let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0); - let (mut tx, rx) = ::client::dispatch::channel(); - let conn = Conn::<_, ::Chunk, ClientTransaction>::new(io); + let (mut tx, rx) = crate::client::dispatch::channel(); + let conn = Conn::<_, crate::Chunk, ClientTransaction>::new(io); let mut dispatcher = Dispatcher::new(Client::new(rx), conn); // First poll is needed to allow tx to send... @@ -578,7 +578,7 @@ mod tests { // Unblock our IO, which has a response before we've sent request! dispatcher.conn.io_mut().block_in(100); - let res_rx = tx.try_send(::Request::new(::Body::empty())).unwrap(); + let res_rx = tx.try_send(crate::Request::new(crate::Body::empty())).unwrap(); let a1 = dispatcher.poll().expect("error should be sent on channel"); assert!(a1.is_ready(), "dispatcher should be closed"); @@ -587,7 +587,7 @@ mod tests { .expect_err("callback response"); match (err.0.kind(), err.1) { - (&::error::Kind::Canceled, Some(_)) => (), + (&crate::error::Kind::Canceled, Some(_)) => (), other => panic!("expected Canceled, got {:?}", other), } Ok::<(), ()>(()) @@ -599,16 +599,16 @@ mod tests { let _ = pretty_env_logger::try_init(); ::futures::lazy(|| { let io = AsyncIo::new_buf(vec![], 0); - let (mut tx, rx) = ::client::dispatch::channel(); - let conn = Conn::<_, ::Chunk, ClientTransaction>::new(io); + let (mut tx, rx) = crate::client::dispatch::channel(); + let conn = Conn::<_, crate::Chunk, ClientTransaction>::new(io); let mut dispatcher = Dispatcher::new(Client::new(rx), conn); // First poll is needed to allow tx to send... assert!(dispatcher.poll().expect("nothing is ready").is_not_ready()); - let body = ::Body::wrap_stream(::futures::stream::once(Ok::<_, ::Error>(""))); + let body = crate::Body::wrap_stream(::futures::stream::once(Ok::<_, crate::Error>(""))); - let _res_rx = tx.try_send(::Request::new(body)).unwrap(); + let _res_rx = tx.try_send(crate::Request::new(body)).unwrap(); dispatcher.poll().expect("empty body shouldn't panic"); Ok::<(), ()>(()) diff --git a/src/proto/h1/encode.rs b/src/proto/h1/encode.rs index fe7c0025..56ed2eec 100644 --- a/src/proto/h1/encode.rs +++ b/src/proto/h1/encode.rs @@ -4,7 +4,7 @@ use bytes::{Buf, IntoBuf}; use bytes::buf::{Chain, Take}; use iovec::IoVec; -use common::StaticBuf; +use crate::common::StaticBuf; use super::io::WriteBuf; /// Encoders to handle different Transfer-Encodings. diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 3535ae43..4bf4da13 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -136,7 +136,7 @@ where } pub(super) fn parse(&mut self, ctx: ParseContext) - -> Poll, ::Error> + -> Poll, crate::Error> where S: Http1Transaction, { @@ -153,14 +153,14 @@ where let max = self.read_buf_strategy.max(); if self.read_buf.len() >= max { debug!("max_buf_size ({}) reached, closing", max); - return Err(::Error::new_too_large()); + return Err(crate::Error::new_too_large()); } }, } - match try_ready!(self.read_from_io().map_err(::Error::new_io)) { + match try_ready!(self.read_from_io().map_err(crate::Error::new_io)) { 0 => { trace!("parse eof"); - return Err(::Error::new_incomplete()); + return Err(crate::Error::new_incomplete()); } _ => {}, } @@ -651,13 +651,13 @@ impl Buf for BufDeque { mod tests { use super::*; use std::io::Read; - use mock::AsyncIo; + use crate::mock::AsyncIo; #[cfg(feature = "nightly")] use test::Bencher; #[cfg(test)] - impl MemRead for ::mock::AsyncIo { + impl MemRead for crate::mock::AsyncIo { fn read_mem(&mut self, len: usize) -> Poll { let mut v = vec![0; len]; let n = try_nb!(self.read(v.as_mut_slice())); @@ -689,7 +689,7 @@ mod tests { cached_headers: &mut None, req_method: &mut None, }; - assert!(buffered.parse::<::proto::h1::ClientTransaction>(ctx).unwrap().is_not_ready()); + assert!(buffered.parse::(ctx).unwrap().is_not_ready()); assert!(buffered.io.blocked()); } @@ -890,10 +890,10 @@ mod tests { let s = "Hello, World!"; b.bytes = s.len() as u64; - let mut write_buf = WriteBuf::<::Chunk>::new(); + let mut write_buf = WriteBuf::::new(); write_buf.set_strategy(WriteStrategy::Flatten); b.iter(|| { - let chunk = ::Chunk::from(s); + let chunk = crate::Chunk::from(s); write_buf.buffer(chunk); ::test::black_box(&write_buf); write_buf.headers.bytes.clear(); diff --git a/src/proto/h1/mod.rs b/src/proto/h1/mod.rs index 5ecbb635..2784ed4f 100644 --- a/src/proto/h1/mod.rs +++ b/src/proto/h1/mod.rs @@ -1,7 +1,7 @@ use bytes::BytesMut; use http::{HeaderMap, Method}; -use proto::{MessageHead, BodyLength, DecodedLength}; +use crate::proto::{MessageHead, BodyLength, DecodedLength}; pub(crate) use self::conn::Conn; pub(crate) use self::dispatch::Dispatcher; @@ -27,9 +27,9 @@ pub(crate) trait Http1Transaction { type Outgoing: Default; const LOG: &'static str; fn parse(bytes: &mut BytesMut, ctx: ParseContext) -> ParseResult; - fn encode(enc: Encode, dst: &mut Vec) -> ::Result; + fn encode(enc: Encode, dst: &mut Vec) -> crate::Result; - fn on_error(err: &::Error) -> Option>; + fn on_error(err: &crate::Error) -> Option>; fn is_client() -> bool { !Self::is_server() @@ -51,7 +51,7 @@ pub(crate) trait Http1Transaction { } /// Result newtype for Http1Transaction::parse. -pub(crate) type ParseResult = Result>, ::error::Parse>; +pub(crate) type ParseResult = Result>, crate::error::Parse>; #[derive(Debug)] pub(crate) struct ParsedMessage { diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index e4cb1154..9aecb8dd 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -10,10 +10,10 @@ use http::header::{self, Entry, HeaderName, HeaderValue}; use http::{HeaderMap, Method, StatusCode, Version}; use httparse; -use error::Parse; -use headers; -use proto::{BodyLength, DecodedLength, MessageHead, RequestLine, RequestHead}; -use proto::h1::{Encode, Encoder, Http1Transaction, ParseResult, ParseContext, ParsedMessage, date}; +use crate::error::Parse; +use crate::headers; +use crate::proto::{BodyLength, DecodedLength, MessageHead, RequestLine, RequestHead}; +use crate::proto::h1::{Encode, Encoder, Http1Transaction, ParseResult, ParseContext, ParsedMessage, date}; const MAX_HEADERS: usize = 100; const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific @@ -239,7 +239,7 @@ impl Http1Transaction for Server { })) } - fn encode(mut msg: Encode, mut dst: &mut Vec) -> ::Result { + fn encode(mut msg: Encode, mut dst: &mut Vec) -> crate::Result { trace!( "Server::encode status={:?}, body={:?}, req_method={:?}", msg.head.subject, @@ -266,7 +266,7 @@ impl Http1Transaction for Server { *msg.head = MessageHead::default(); msg.head.subject = StatusCode::INTERNAL_SERVER_ERROR; msg.body = None; - (Err(::Error::new_user_unsupported_status_code()), true) + (Err(crate::Error::new_user_unsupported_status_code()), true) } else { (Ok(()), !msg.keep_alive) }; @@ -309,7 +309,7 @@ impl Http1Transaction for Server { if wrote_len { warn!("unexpected content-length found, canceling"); rewind(dst); - return Err(::Error::new_user_header()); + return Err(crate::Error::new_user_header()); } match msg.body { Some(BodyLength::Known(known_len)) => { @@ -369,7 +369,7 @@ impl Http1Transaction for Server { if fold.0 != len { warn!("multiple Content-Length values found: [{}, {}]", fold.0, len); rewind(dst); - return Err(::Error::new_user_header()); + return Err(crate::Error::new_user_header()); } folded = Some(fold); } else { @@ -378,7 +378,7 @@ impl Http1Transaction for Server { } else { warn!("illegal Content-Length value: {:?}", value); rewind(dst); - return Err(::Error::new_user_header()); + return Err(crate::Error::new_user_header()); } } if let Some((len, value)) = folded { @@ -418,7 +418,7 @@ impl Http1Transaction for Server { if wrote_len { warn!("unexpected transfer-encoding found, canceling"); rewind(dst); - return Err(::Error::new_user_header()); + return Err(crate::Error::new_user_header()); } // check that we actually can send a chunked body... if msg.head.version == Version::HTTP_10 || !Server::can_chunked(msg.req_method, msg.head.subject) { @@ -531,8 +531,8 @@ impl Http1Transaction for Server { ret.map(|()| encoder.set_last(is_last)) } - fn on_error(err: &::Error) -> Option> { - use ::error::Kind; + fn on_error(err: &crate::Error) -> Option> { + use crate::error::Kind; let status = match *err.kind() { Kind::Parse(Parse::Method) | Kind::Parse(Parse::Header) | @@ -666,7 +666,7 @@ impl Http1Transaction for Client { } } - fn encode(msg: Encode, dst: &mut Vec) -> ::Result { + fn encode(msg: Encode, dst: &mut Vec) -> crate::Result { trace!("Client::encode method={:?}, body={:?}", msg.head.subject.0, msg.body); *msg.req_method = Some(msg.head.subject.0.clone()); @@ -704,7 +704,7 @@ impl Http1Transaction for Client { Ok(body) } - fn on_error(_err: &::Error) -> Option> { + fn on_error(_err: &crate::Error) -> Option> { // we can't tell the server about any errors it creates None } @@ -937,7 +937,7 @@ fn record_header_indices( bytes: &[u8], headers: &[httparse::Header], indices: &mut [HeaderIndices] -) -> Result<(), ::error::Parse> { +) -> Result<(), crate::error::Parse> { let bytes_ptr = bytes.as_ptr() as usize; // FIXME: This should be a single plain `for` loop. @@ -966,7 +966,7 @@ fn record_header_indices( { if header.name.len() >= (1 << 16) { debug!("header name larger than 64kb: {:?}", header.name); - return Err(::error::Parse::TooLarge); + return Err(crate::error::Parse::TooLarge); } let name_start = header.name.as_ptr() as usize - bytes_ptr; let name_end = name_start + header.name.len(); @@ -1071,12 +1071,12 @@ mod tests { req_method: &mut method, }).unwrap().unwrap(); assert_eq!(raw.len(), 0); - assert_eq!(msg.head.subject.0, ::Method::GET); + assert_eq!(msg.head.subject.0, crate::Method::GET); assert_eq!(msg.head.subject.1, "/echo"); - assert_eq!(msg.head.version, ::Version::HTTP_11); + assert_eq!(msg.head.version, crate::Version::HTTP_11); assert_eq!(msg.head.headers.len(), 1); assert_eq!(msg.head.headers["Host"], "hyper.rs"); - assert_eq!(method, Some(::Method::GET)); + assert_eq!(method, Some(crate::Method::GET)); } @@ -1087,12 +1087,12 @@ mod tests { let mut raw = BytesMut::from(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec()); let ctx = ParseContext { cached_headers: &mut None, - req_method: &mut Some(::Method::GET), + req_method: &mut Some(crate::Method::GET), }; let msg = Client::parse(&mut raw, ctx).unwrap().unwrap(); assert_eq!(raw.len(), 0); - assert_eq!(msg.head.subject, ::StatusCode::OK); - assert_eq!(msg.head.version, ::Version::HTTP_11); + assert_eq!(msg.head.subject, crate::StatusCode::OK); + assert_eq!(msg.head.version, crate::Version::HTTP_11); assert_eq!(msg.head.headers.len(), 1); assert_eq!(msg.head.headers["Content-Length"], "0"); } @@ -1120,7 +1120,7 @@ mod tests { .expect("parse complete") } - fn parse_err(s: &str, comment: &str) -> ::error::Parse { + fn parse_err(s: &str, comment: &str) -> crate::error::Parse { let mut bytes = BytesMut::from(s); Server::parse(&mut bytes, ParseContext { cached_headers: &mut None, @@ -1266,7 +1266,7 @@ mod tests { .expect("parse complete") } - fn parse_err(s: &str) -> ::error::Parse { + fn parse_err(s: &str) -> crate::error::Parse { let mut bytes = BytesMut::from(s); Client::parse(&mut bytes, ParseContext { cached_headers: &mut None, @@ -1423,7 +1423,7 @@ mod tests { #[test] fn test_client_request_encode_title_case() { use http::header::HeaderValue; - use proto::BodyLength; + use crate::proto::BodyLength; let mut head = MessageHead::default(); head.headers.insert("content-length", HeaderValue::from_static("10")); @@ -1553,7 +1553,7 @@ mod tests { #[bench] fn bench_server_encode_headers_preset(b: &mut Bencher) { use http::header::HeaderValue; - use proto::BodyLength; + use crate::proto::BodyLength; let len = 108; b.bytes = len as u64; @@ -1581,7 +1581,7 @@ mod tests { #[cfg(feature = "nightly")] #[bench] fn bench_server_encode_no_headers(b: &mut Bencher) { - use proto::BodyLength; + use crate::proto::BodyLength; let len = 76; b.bytes = len as u64; diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 491a9165..a488a412 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -5,15 +5,15 @@ use futures::sync::{mpsc, oneshot}; use h2::client::{Builder, Handshake, SendRequest}; use tokio_io::{AsyncRead, AsyncWrite}; -use headers::content_length_parse_all; -use body::Payload; -use ::common::{Exec, Never}; -use headers; -use ::proto::Dispatched; +use crate::headers::content_length_parse_all; +use crate::body::Payload; +use crate::common::{Exec, Never}; +use crate::headers; +use crate::proto::Dispatched; use super::{PipeToSendStream, SendBuf}; -use ::{Body, Request, Response}; +use crate::{Body, Request, Response}; -type ClientRx = ::client::dispatch::Receiver, Response>; +type ClientRx = crate::client::dispatch::Receiver, Response>; /// An mpsc channel is used to help notify the `Connection` task when *all* /// other handles to it have been dropped, so that it can shutdown. type ConnDropRef = mpsc::Sender; @@ -58,13 +58,13 @@ where B: Payload + 'static, { type Item = Dispatched; - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { loop { let next = match self.state { State::Handshaking(ref mut h) => { - let (request_tx, conn) = try_ready!(h.poll().map_err(::Error::new_h2)); + let (request_tx, conn) = try_ready!(h.poll().map_err(crate::Error::new_h2)); // An mpsc channel is used entirely to detect when the // 'Client' has been dropped. This is to get around a bug // in h2 where dropping all SendRequests won't notify a @@ -111,7 +111,7 @@ where trace!("connection gracefully shutdown"); Ok(Async::Ready(Dispatched::Shutdown)) } else { - Err(::Error::new_h2(err)) + Err(crate::Error::new_h2(err)) }; } } @@ -133,7 +133,7 @@ where Ok(ok) => ok, Err(err) => { debug!("client send request error: {}", err); - cb.send(Err((::Error::new_h2(err), None))); + cb.send(Err((crate::Error::new_h2(err), None))); continue; } }; @@ -162,12 +162,12 @@ where Ok(res) => { let content_length = content_length_parse_all(res.headers()); let res = res.map(|stream| - ::Body::h2(stream, content_length)); + crate::Body::h2(stream, content_length)); Ok(res) }, Err(err) => { debug!("client response error: {}", err); - Err((::Error::new_h2(err), None)) + Err((crate::Error::new_h2(err), None)) } } }); diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 3fe88389..07792d20 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -7,7 +7,7 @@ use http::header::{ }; use http::HeaderMap; -use body::Payload; +use crate::body::Payload; mod client; pub(crate) mod server; @@ -91,18 +91,18 @@ where } } - fn on_user_err(&mut self, err: S::Error) -> ::Error { - let err = ::Error::new_user_body(err); + fn on_user_err(&mut self, err: S::Error) -> crate::Error { + let err = crate::Error::new_user_body(err); debug!("send body user stream error: {}", err); self.body_tx.send_reset(err.h2_reason()); err } - fn send_eos_frame(&mut self) -> ::Result<()> { + fn send_eos_frame(&mut self) -> crate::Result<()> { trace!("send body eos"); self.body_tx .send_data(SendBuf(None), true) - .map_err(::Error::new_body_write) + .map_err(crate::Error::new_body_write) } } @@ -111,7 +111,7 @@ where S: Payload, { type Item = (); - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { loop { @@ -123,18 +123,18 @@ where if self.body_tx.capacity() == 0 { loop { - match try_ready!(self.body_tx.poll_capacity().map_err(::Error::new_body_write)) { + match try_ready!(self.body_tx.poll_capacity().map_err(crate::Error::new_body_write)) { Some(0) => {} Some(_) => break, - None => return Err(::Error::new_canceled()), + None => return Err(crate::Error::new_canceled()), } } } else { if let Async::Ready(reason) = - self.body_tx.poll_reset().map_err(::Error::new_body_write)? + self.body_tx.poll_reset().map_err(crate::Error::new_body_write)? { debug!("stream received RST_STREAM: {:?}", reason); - return Err(::Error::new_body_write(::h2::Error::from(reason))); + return Err(crate::Error::new_body_write(::h2::Error::from(reason))); } } @@ -150,7 +150,7 @@ where let buf = SendBuf(Some(chunk)); self.body_tx .send_data(buf, is_eos) - .map_err(::Error::new_body_write)?; + .map_err(crate::Error::new_body_write)?; if is_eos { return Ok(Async::Ready(())); @@ -169,17 +169,17 @@ where } } else { if let Async::Ready(reason) = - self.body_tx.poll_reset().map_err(|e| ::Error::new_body_write(e))? + self.body_tx.poll_reset().map_err(|e| crate::Error::new_body_write(e))? { debug!("stream received RST_STREAM: {:?}", reason); - return Err(::Error::new_body_write(::h2::Error::from(reason))); + return Err(crate::Error::new_body_write(::h2::Error::from(reason))); } match try_ready!(self.stream.poll_trailers().map_err(|e| self.on_user_err(e))) { Some(trailers) => { self.body_tx .send_trailers(trailers) - .map_err(::Error::new_body_write)?; + .map_err(crate::Error::new_body_write)?; return Ok(Async::Ready(())); } None => { diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 3d21d193..3d049847 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -5,16 +5,16 @@ use h2::Reason; use h2::server::{Builder, Connection, Handshake, SendResponse}; use tokio_io::{AsyncRead, AsyncWrite}; -use ::headers::content_length_parse_all; -use ::body::Payload; -use body::internal::FullDataArg; -use ::common::exec::H2Exec; -use ::headers; -use ::service::Service; -use ::proto::Dispatched; +use crate::headers::content_length_parse_all; +use crate::body::Payload; +use crate::body::internal::FullDataArg; +use crate::common::exec::H2Exec; +use crate::headers; +use crate::service::Service; +use crate::proto::Dispatched; use super::{PipeToSendStream, SendBuf}; -use ::{Body, Response}; +use crate::{Body, Response}; pub(crate) struct Server where @@ -40,7 +40,7 @@ where B: Payload, { conn: Connection>, - closing: Option<::Error>, + closing: Option, } @@ -90,13 +90,13 @@ where E: H2Exec, { type Item = Dispatched; - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { loop { let next = match self.state { State::Handshaking(ref mut h) => { - let conn = try_ready!(h.poll().map_err(::Error::new_h2)); + let conn = try_ready!(h.poll().map_err(crate::Error::new_h2)); State::Serving(Serving { conn, closing: None, @@ -122,7 +122,7 @@ where T: AsyncRead + AsyncWrite, B: Payload, { - fn poll_server(&mut self, service: &mut S, exec: &E) -> Poll<(), ::Error> + fn poll_server(&mut self, service: &mut S, exec: &E) -> Poll<(), crate::Error> where S: Service< ReqBody=Body, @@ -138,12 +138,12 @@ where Ok(Async::Ready(())) => (), Ok(Async::NotReady) => { // use `poll_close` instead of `poll`, in order to avoid accepting a request. - try_ready!(self.conn.poll_close().map_err(::Error::new_h2)); + try_ready!(self.conn.poll_close().map_err(crate::Error::new_h2)); trace!("incoming connection complete"); return Ok(Async::Ready(())); } Err(err) => { - let err = ::Error::new_user_service(err); + let err = crate::Error::new_user_service(err); debug!("service closed: {}", err); let reason = err.h2_reason(); @@ -161,11 +161,11 @@ where } // When the service is ready, accepts an incoming request. - if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) { + if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(crate::Error::new_h2)) { trace!("incoming request"); let content_length = content_length_parse_all(req.headers()); let req = req.map(|stream| { - ::Body::h2(stream, content_length) + crate::Body::h2(stream, content_length) }); let fut = H2Stream::new(service.call(req), respond); exec.execute_h2stream(fut)?; @@ -179,7 +179,7 @@ where debug_assert!(self.closing.is_some(), "poll_server broke loop without closing"); - try_ready!(self.conn.poll_close().map_err(::Error::new_h2)); + try_ready!(self.conn.poll_close().map_err(crate::Error::new_h2)); Err(self.closing.take().expect("polled after error")) } @@ -215,7 +215,7 @@ where } } - fn poll2(&mut self) -> Poll<(), ::Error> { + fn poll2(&mut self) -> Poll<(), crate::Error> { loop { let next = match self.state { H2StreamState::Service(ref mut h) => { @@ -225,15 +225,15 @@ where // Body is not yet ready, so we want to check if the client has sent a // RST_STREAM frame which would cancel the current request. if let Async::Ready(reason) = - self.reply.poll_reset().map_err(|e| ::Error::new_h2(e))? + self.reply.poll_reset().map_err(|e| crate::Error::new_h2(e))? { debug!("stream received RST_STREAM: {:?}", reason); - return Err(::Error::new_h2(reason.into())); + return Err(crate::Error::new_h2(reason.into())); } return Ok(Async::NotReady); } Err(e) => { - let err = ::Error::new_user_service(e); + let err = crate::Error::new_user_service(e); warn!("http2 service errored: {}", err); self.reply.send_reset(err.h2_reason()); return Err(err); @@ -249,7 +249,7 @@ where .headers_mut() .entry(::http::header::DATE) .expect("DATE is a valid HeaderName") - .or_insert_with(::proto::h1::date::update_and_header_value); + .or_insert_with(crate::proto::h1::date::update_and_header_value); macro_rules! reply { ($eos:expr) => ({ @@ -258,7 +258,7 @@ where Err(e) => { debug!("send response error: {}", e); self.reply.send_reset(Reason::INTERNAL_ERROR); - return Err(::Error::new_h2(e)); + return Err(crate::Error::new_h2(e)); } } }) @@ -274,7 +274,7 @@ where let buf = SendBuf(Some(full)); body_tx .send_data(buf, true) - .map_err(::Error::new_body_write)?; + .map_err(crate::Error::new_body_write)?; return Ok(Async::Ready(())); } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 0e0cd62c..dd33ac21 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -40,7 +40,7 @@ pub(crate) enum Dispatched { /// Dispatcher completely shutdown connection. Shutdown, /// Dispatcher has pending upgrade, and so did not shutdown. - Upgrade(::upgrade::Pending), + Upgrade(crate::upgrade::Pending), } /// A separate module to encapsulate the invariants of the DecodedLength type. @@ -83,12 +83,12 @@ mod body_length { } /// Checks the `u64` is within the maximum allowed for content-length. - pub(crate) fn checked_new(len: u64) -> Result { + pub(crate) fn checked_new(len: u64) -> Result { if len <= MAX_LEN { Ok(DecodedLength(len)) } else { warn!("content-length bigger than maximum: {} > {}", len, MAX_LEN); - Err(::error::Parse::TooLarge) + Err(crate::error::Parse::TooLarge) } } } diff --git a/src/server/conn.rs b/src/server/conn.rs index ad6a2e8e..dad7451b 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -22,13 +22,13 @@ use h2; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "runtime")] use tokio_reactor::Handle; -use body::{Body, Payload}; -use common::exec::{Exec, H2Exec, NewSvcExec}; -use common::io::Rewind; -use error::{Kind, Parse}; -use proto; -use service::{MakeServiceRef, Service}; -use upgrade::Upgraded; +use crate::body::{Body, Payload}; +use crate::common::exec::{Exec, H2Exec, NewSvcExec}; +use crate::common::io::Rewind; +use crate::error::{Kind, Parse}; +use crate::proto; +use crate::service::{MakeServiceRef, Service}; +use crate::upgrade::Upgraded; pub(super) use self::spawn_all::NoopWatcher; use self::spawn_all::NewSvcTask; @@ -413,7 +413,7 @@ impl Http { /// `make_service` object provided, creating a new service per /// connection. #[cfg(feature = "runtime")] - pub fn serve_addr(&self, addr: &SocketAddr, make_service: S) -> ::Result> + pub fn serve_addr(&self, addr: &SocketAddr, make_service: S) -> crate::Result> where S: MakeServiceRef< AddrStream, @@ -438,7 +438,7 @@ impl Http { /// `make_service` object provided, creating a new service per /// connection. #[cfg(feature = "runtime")] - pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, make_service: S) -> ::Result> + pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, make_service: S) -> crate::Result> where S: MakeServiceRef< AddrStream, @@ -547,7 +547,7 @@ where /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html) /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html) /// to work with this function; or use the `without_shutdown` wrapper. - pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> { + pub fn poll_without_shutdown(&mut self) -> Poll<(), crate::Error> { loop { let polled = match *self.conn.as_mut().unwrap() { Either::A(ref mut h1) => h1.poll_without_shutdown(), @@ -570,9 +570,9 @@ where /// Prevent shutdown of the underlying IO object at the end of service the request, /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`. - pub fn without_shutdown(self) -> impl Future, Error=::Error> { + pub fn without_shutdown(self) -> impl Future, Error=crate::Error> { let mut conn = Some(self); - ::futures::future::poll_fn(move || -> ::Result<_> { + ::futures::future::poll_fn(move || -> crate::Result<_> { try_ready!(conn.as_mut().unwrap().poll_without_shutdown()); Ok(conn.take().unwrap().into_parts().into()) }) @@ -629,7 +629,7 @@ where E: H2Exec, { type Item = (); - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { loop { @@ -701,7 +701,7 @@ where E: H2Exec<::Future, B>, { type Item = Connecting; - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll, Self::Error> { match self.make_service.poll_ready_ref() { @@ -709,11 +709,11 @@ where Ok(Async::NotReady) => return Ok(Async::NotReady), Err(e) => { trace!("make_service closed"); - return Err(::Error::new_user_make_service(e)); + return Err(crate::Error::new_user_make_service(e)); } } - if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) { + if let Some(io) = try_ready!(self.incoming.poll().map_err(crate::Error::new_accept)) { let new_fut = self.make_service.make_service_ref(&io); Ok(Async::Ready(Some(Connecting { future: new_fut, @@ -774,7 +774,7 @@ where B: Payload, E: H2Exec<::Future, B>, { - pub(super) fn poll_watch(&mut self, watcher: &W) -> Poll<(), ::Error> + pub(super) fn poll_watch(&mut self, watcher: &W) -> Poll<(), crate::Error> where E: NewSvcExec, W: Watcher, @@ -795,9 +795,9 @@ pub(crate) mod spawn_all { use futures::{Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; - use body::{Body, Payload}; - use common::exec::H2Exec; - use service::Service; + use crate::body::{Body, Payload}; + use crate::common::exec::H2Exec; + use crate::service::Service; use super::{Connecting, UpgradeableConnection}; // Used by `SpawnAll` to optionally watch a `Connection` future. @@ -809,7 +809,7 @@ pub(crate) mod spawn_all { // connections, and signal that they start to shutdown when prompted, so // it has a `GracefulWatcher` implementation to do that. pub trait Watcher: Clone { - type Future: Future; + type Future: Future; fn watch(&self, conn: UpgradeableConnection) -> Self::Future; } @@ -878,7 +878,7 @@ pub(crate) mod spawn_all { let conn = try_ready!(connecting .poll() .map_err(|err| { - let err = ::Error::new_user_make_service(err); + let err = crate::Error::new_user_make_service(err); debug!("connecting error: {}", err); })); let connected = watcher.watch(conn.with_upgrades()); @@ -941,7 +941,7 @@ mod upgrades { E: super::H2Exec, { type Item = (); - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { loop { diff --git a/src/server/mod.rs b/src/server/mod.rs index b55af698..d7132b07 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -64,9 +64,9 @@ use futures::{Future, Stream, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "runtime")] use tokio_reactor; -use body::{Body, Payload}; -use common::exec::{Exec, H2Exec, NewSvcExec}; -use service::{MakeServiceRef, Service}; +use crate::body::{Body, Payload}; +use crate::common::exec::{Exec, H2Exec, NewSvcExec}; +use crate::service::{MakeServiceRef, Service}; // Renamed `Http` as `Http_` for now so that people upgrading don't see an // error that `hyper::server::Http` is private... use self::conn::{Http as Http_, NoopWatcher, SpawnAll}; @@ -119,13 +119,13 @@ impl Server { } /// Tries to bind to the provided address, and returns a [`Builder`](Builder). - pub fn try_bind(addr: &SocketAddr) -> ::Result> { + pub fn try_bind(addr: &SocketAddr) -> crate::Result> { AddrIncoming::new(addr, None) .map(Server::builder) } /// Create a new instance from a `std::net::TcpListener` instance. - pub fn from_tcp(listener: StdTcpListener) -> Result, ::Error> { + pub fn from_tcp(listener: StdTcpListener) -> Result, crate::Error> { let handle = tokio_reactor::Handle::default(); AddrIncoming::from_std(listener, &handle) .map(Server::builder) @@ -212,7 +212,7 @@ where E: NewSvcExec, { type Item = (); - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { self.spawn_all.poll_watch(&NoopWatcher) diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index 07ab6874..351a25e0 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -3,10 +3,10 @@ use std::error::Error as StdError; use futures::{Async, Future, Stream, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; -use body::{Body, Payload}; -use common::drain::{self, Draining, Signal, Watch, Watching}; -use common::exec::{H2Exec, NewSvcExec}; -use service::{MakeServiceRef, Service}; +use crate::body::{Body, Payload}; +use crate::common::drain::{self, Draining, Signal, Watch, Watching}; +use crate::common::exec::{H2Exec, NewSvcExec}; +use crate::service::{MakeServiceRef, Service}; use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; #[allow(missing_debug_implementations)] @@ -51,7 +51,7 @@ where E: NewSvcExec, { type Item = (); - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { loop { diff --git a/src/server/tcp.rs b/src/server/tcp.rs index 48143bac..29813132 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -22,9 +22,9 @@ pub struct AddrIncoming { } impl AddrIncoming { - pub(super) fn new(addr: &SocketAddr, handle: Option<&Handle>) -> ::Result { + pub(super) fn new(addr: &SocketAddr, handle: Option<&Handle>) -> crate::Result { let std_listener = StdTcpListener::bind(addr) - .map_err(::Error::new_listen)?; + .map_err(crate::Error::new_listen)?; if let Some(handle) = handle { AddrIncoming::from_std(std_listener, handle) @@ -34,10 +34,10 @@ impl AddrIncoming { } } - pub(super) fn from_std(std_listener: StdTcpListener, handle: &Handle) -> ::Result { + pub(super) fn from_std(std_listener: StdTcpListener, handle: &Handle) -> crate::Result { let listener = TcpListener::from_std(std_listener, &handle) - .map_err(::Error::new_listen)?; - let addr = listener.local_addr().map_err(::Error::new_listen)?; + .map_err(crate::Error::new_listen)?; + let addr = listener.local_addr().map_err(crate::Error::new_listen)?; Ok(AddrIncoming { listener, addr: addr, @@ -49,7 +49,7 @@ impl AddrIncoming { } /// Creates a new `AddrIncoming` binding to provided socket address. - pub fn bind(addr: &SocketAddr) -> ::Result { + pub fn bind(addr: &SocketAddr) -> crate::Result { AddrIncoming::new(addr, None) } diff --git a/src/service/make_service.rs b/src/service/make_service.rs index d5c7e477..bd62a2de 100644 --- a/src/service/make_service.rs +++ b/src/service/make_service.rs @@ -3,7 +3,7 @@ use std::fmt; use futures::{Async, Future, IntoFuture, Poll}; -use body::Payload; +use crate::body::Payload; use super::Service; /// An asynchronous constructor of `Service`s. diff --git a/src/service/new_service.rs b/src/service/new_service.rs index 79873c5a..de6954e3 100644 --- a/src/service/new_service.rs +++ b/src/service/new_service.rs @@ -2,7 +2,7 @@ use std::error::Error as StdError; use futures::{Async, Future, IntoFuture, Poll}; -use body::Payload; +use crate::body::Payload; use super::{MakeService, Service}; /// An asynchronous constructor of `Service`s. diff --git a/src/service/service.rs b/src/service/service.rs index 41d47bd0..92681ff2 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -4,9 +4,9 @@ use std::marker::PhantomData; use futures::{future, Async, Future, IntoFuture, Poll}; -use body::Payload; -use common::Never; -use ::{Request, Response}; +use crate::body::Payload; +use crate::common::Never; +use crate::{Request, Response}; /// An asynchronous function from `Request` to `Response`. pub trait Service { @@ -179,16 +179,16 @@ fn _assert_fn_mut() { let mut val = 0; - let svc = service_fn(move |_req: Request<::Body>| { + let svc = service_fn(move |_req: Request| { val += 1; - future::ok::<_, Never>(Response::new(::Body::empty())) + future::ok::<_, Never>(Response::new(crate::Body::empty())) }); assert_service(&svc); - let svc = service_fn_ok(move |_req: Request<::Body>| { + let svc = service_fn_ok(move |_req: Request| { val += 1; - Response::new(::Body::empty()) + Response::new(crate::Body::empty()) }); assert_service(&svc); diff --git a/src/upgrade.rs b/src/upgrade.rs index 63646fb2..2f62077e 100644 --- a/src/upgrade.rs +++ b/src/upgrade.rs @@ -15,7 +15,7 @@ use futures::{Async, Future, Poll}; use futures::sync::oneshot; use tokio_io::{AsyncRead, AsyncWrite}; -use common::io::Rewind; +use crate::common::io::Rewind; /// An upgraded HTTP connection. /// @@ -33,7 +33,7 @@ pub struct Upgraded { /// /// If no upgrade was available, or it doesn't succeed, yields an `Error`. pub struct OnUpgrade { - rx: Option>>, + rx: Option>>, } /// The deconstructed parts of an [`Upgraded`](Upgraded) type. @@ -57,7 +57,7 @@ pub struct Parts { } pub(crate) struct Pending { - tx: oneshot::Sender<::Result> + tx: oneshot::Sender> } /// Error cause returned when an upgrade was expected but canceled @@ -200,7 +200,7 @@ impl OnUpgrade { impl Future for OnUpgrade { type Item = Upgraded; - type Error = ::Error; + type Error = crate::Error; fn poll(&mut self) -> Poll { match self.rx { @@ -209,10 +209,10 @@ impl Future for OnUpgrade { Ok(Async::Ready(Ok(upgraded))) => Ok(Async::Ready(upgraded)), Ok(Async::Ready(Err(err))) => Err(err), Err(_oneshot_canceled) => Err( - ::Error::new_canceled().with(UpgradeExpected(())) + crate::Error::new_canceled().with(UpgradeExpected(())) ), }, - None => Err(::Error::new_user_no_upgrade()), + None => Err(crate::Error::new_user_no_upgrade()), } } } @@ -236,7 +236,7 @@ impl Pending { /// upgrades are handled manually. pub(crate) fn manual(self) { trace!("pending upgrade handled manually"); - let _ = self.tx.send(Err(::Error::new_user_manual_upgrade())); + let _ = self.tx.send(Err(crate::Error::new_user_manual_upgrade())); } } diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 96d8e5ad..11437f96 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -5,9 +5,9 @@ pub extern crate tokio; use std::sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}}; use std::time::Duration; -use hyper::{Body, Client, Request, Response, Server, Version}; -use hyper::client::HttpConnector; -use hyper::service::service_fn; +use crate::hyper::{Body, Client, Request, Response, Server, Version}; +use crate::hyper::client::HttpConnector; +use crate::hyper::service::service_fn; pub use std::net::SocketAddr; pub use self::futures::{future, Future, Stream}; @@ -207,12 +207,12 @@ macro_rules! __internal_headers_map { macro_rules! __internal_headers_eq { (@pat $name: expr, $pat:pat) => { - ::std::sync::Arc::new(move |__hdrs: &::hyper::HeaderMap| { + ::std::sync::Arc::new(move |__hdrs: &crate::hyper::HeaderMap| { match __hdrs.get($name) { $pat => (), other => panic!("headers[{}] was not {}: {:?}", stringify!($name), stringify!($pat), other), } - }) as ::std::sync::Arc + }) as ::std::sync::Arc }; (@val $name: expr, NONE) => { __internal_headers_eq!(@pat $name, None); @@ -222,13 +222,13 @@ macro_rules! __internal_headers_eq { }; (@val $name: expr, $val:expr) => ({ let __val = Option::from($val); - ::std::sync::Arc::new(move |__hdrs: &::hyper::HeaderMap| { + ::std::sync::Arc::new(move |__hdrs: &crate::hyper::HeaderMap| { if let Some(ref val) = __val { assert_eq!(__hdrs.get($name).expect(stringify!($name)), val.to_string().as_str(), stringify!($name)); } else { assert_eq!(__hdrs.get($name), None, stringify!($name)); } - }) as ::std::sync::Arc + }) as ::std::sync::Arc }); ($headers:ident, { $($name:expr => $val:tt,)* }) => { $( @@ -378,7 +378,7 @@ pub fn __run_test(cfg: __TestConfig) { .map_err(|never| -> hyper::Error { match never {} }) .flatten() .map_err(|e| panic!("server connection error: {}", e)); - ::tokio::spawn(fut); + crate::tokio::spawn(fut); Ok::<_, hyper::Error>(cnt) }) .map(|_| ())