feat(client): Make client an optional feature

cc #2223

BREAKING CHANGE: The HTTP client of hyper is now an optional feature. To
  enable the client, add `features = ["client"]` to the dependency in
  your `Cargo.toml`.
This commit is contained in:
Sean McArthur
2020-11-17 17:06:25 -08:00
committed by GitHub
parent eb092a7b8c
commit 4e55583d30
19 changed files with 242 additions and 168 deletions

View File

@@ -74,11 +74,12 @@ pnet = "0.25.0"
default = [ default = [
"runtime", "runtime",
"stream", "stream",
"client",
"http1", "http1",
"http2", "http2",
] ]
full = [ full = [
"client",
"http1", "http1",
"http2", "http2",
"stream", "stream",
@@ -99,6 +100,8 @@ tcp = [
http1 = [] http1 = []
http2 = ["h2"] http2 = ["h2"]
client = []
# `impl Stream` for things # `impl Stream` for things
stream = [] stream = []

View File

@@ -6,6 +6,7 @@ use std::fmt;
use bytes::Bytes; use bytes::Bytes;
use futures_channel::mpsc; use futures_channel::mpsc;
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
use futures_channel::oneshot; use futures_channel::oneshot;
use futures_core::Stream; // for mpsc::Receiver use futures_core::Stream; // for mpsc::Receiver
#[cfg(feature = "stream")] #[cfg(feature = "stream")]
@@ -18,6 +19,7 @@ use super::DecodedLength;
use crate::common::sync_wrapper::SyncWrapper; use crate::common::sync_wrapper::SyncWrapper;
use crate::common::{task, watch, Pin, Poll}; use crate::common::{task, watch, Pin, Poll};
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
use crate::common::{Future, Never}; use crate::common::{Future, Never};
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
use crate::proto::h2::ping; use crate::proto::h2::ping;
@@ -72,16 +74,19 @@ struct Extra {
} }
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
type DelayEofUntil = oneshot::Receiver<Never>; type DelayEofUntil = oneshot::Receiver<Never>;
enum DelayEof { enum DelayEof {
/// Initial state, stream hasn't seen EOF yet. /// Initial state, stream hasn't seen EOF yet.
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
NotEof(DelayEofUntil), NotEof(DelayEofUntil),
/// Transitions to this state once we've seen `poll` try to /// Transitions to this state once we've seen `poll` try to
/// return EOF (`None`). This future is then polled, and /// return EOF (`None`). This future is then polled, and
/// when it completes, the Body finally returns EOF (`None`). /// when it completes, the Body finally returns EOF (`None`).
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
Eof(DelayEofUntil), Eof(DelayEofUntil),
} }
@@ -219,6 +224,7 @@ impl Body {
} }
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut)); self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
} }
@@ -242,6 +248,7 @@ impl Body {
fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
match self.take_delayed_eof() { match self.take_delayed_eof() {
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) { Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => { ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay)); self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
@@ -258,6 +265,7 @@ impl Body {
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
}, },
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) { Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) {
Poll::Ready(Ok(never)) => match never {}, Poll::Ready(Ok(never)) => match never {},
Poll::Pending => { Poll::Pending => {
@@ -266,7 +274,10 @@ impl Body {
} }
Poll::Ready(Err(_done)) => Poll::Ready(None), Poll::Ready(Err(_done)) => Poll::Ready(None),
}, },
#[cfg(not(any(feature = "http1", feature = "http2")))] #[cfg(any(
not(any(feature = "http1", feature = "http2")),
not(feature = "client")
))]
Some(delay_eof) => match delay_eof {}, Some(delay_eof) => match delay_eof {},
None => self.poll_inner(cx), None => self.poll_inner(cx),
} }

View File

@@ -1,37 +1,50 @@
macro_rules! cfg_any_http { macro_rules! cfg_feature {
($($item:item)*) => { (
#![$meta:meta]
$($item:item)*
) => {
$( $(
#[cfg(any( #[cfg($meta)]
feature = "http1", #[cfg_attr(docsrs, doc(cfg($meta)))]
feature = "http2",
))]
#[cfg_attr(docsrs, doc(cfg(any(
feature = "http1",
feature = "http2",
))))]
$item $item
)* )*
} }
} }
macro_rules! cfg_any_http {
($($item:item)*) => {
cfg_feature! {
#![any(feature = "http1", feature = "http2")]
$($item)*
}
}
}
cfg_any_http! { cfg_any_http! {
macro_rules! cfg_http1 { macro_rules! cfg_http1 {
($($item:item)*) => { ($($item:item)*) => {
$( cfg_feature! {
#[cfg(feature = "http1")] #![feature = "http1"]
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))] $($item)*
$item }
)*
} }
} }
macro_rules! cfg_http2 { macro_rules! cfg_http2 {
($($item:item)*) => { ($($item:item)*) => {
$( cfg_feature! {
#[cfg(feature = "http2")] #![feature = "http2"]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))] $($item)*
$item }
)* }
}
macro_rules! cfg_client {
($($item:item)*) => {
cfg_feature! {
#![feature = "client"]
$($item)*
}
} }
} }
} }

View File

@@ -25,7 +25,7 @@ use tower_service::Service;
use super::dispatch; use super::dispatch;
use crate::body::HttpBody; use crate::body::HttpBody;
use crate::common::{task, BoxSendFuture, Exec, Future, Pin, Poll}; use crate::common::{task, exec::{BoxSendFuture, Exec}, Future, Pin, Poll};
use crate::proto; use crate::proto;
use crate::rt::Executor; use crate::rt::Executor;
#[cfg(feature = "http1")] #[cfg(feature = "http1")]

View File

@@ -62,7 +62,7 @@ use http::{Method, Request, Response, Uri, Version};
use self::connect::{sealed::Connect, Alpn, Connected, Connection}; use self::connect::{sealed::Connect, Alpn, Connected, Connection};
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation}; use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};
use crate::body::{Body, HttpBody}; use crate::body::{Body, HttpBody};
use crate::common::{lazy as hyper_lazy, task, BoxSendFuture, Future, Lazy, Pin, Poll}; use crate::common::{lazy as hyper_lazy, task, exec::BoxSendFuture, Future, Lazy, Pin, Poll};
use crate::rt::Executor; use crate::rt::Executor;
#[cfg(feature = "tcp")] #[cfg(feature = "tcp")]

View File

@@ -11,7 +11,7 @@ use futures_channel::oneshot;
use tokio::time::{Duration, Instant, Interval}; use tokio::time::{Duration, Instant, Interval};
use super::Ver; use super::Ver;
use crate::common::{task, Exec, Future, Pin, Poll, Unpin}; use crate::common::{task, exec::Exec, Future, Pin, Poll, Unpin};
// FIXME: allow() required due to `impl Trait` leaking types to this lint // FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@@ -777,7 +777,7 @@ mod tests {
use std::time::Duration; use std::time::Duration;
use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
use crate::common::{task, Exec, Future, Pin}; use crate::common::{task, exec::Exec, Future, Pin};
/// Test unique reservations. /// Test unique reservations.
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]

View File

@@ -16,6 +16,7 @@ pub(crate) mod drain;
pub(crate) mod exec; pub(crate) mod exec;
pub(crate) mod io; pub(crate) mod io;
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
mod lazy; mod lazy;
mod never; mod never;
#[cfg(feature = "stream")] #[cfg(feature = "stream")]
@@ -23,9 +24,10 @@ pub(crate) mod sync_wrapper;
pub(crate) mod task; pub(crate) mod task;
pub(crate) mod watch; pub(crate) mod watch;
//#[cfg(any(feature = "http1", feature = "http2"))]
//pub(crate) use self::exec::{BoxSendFuture, Exec};
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
pub(crate) use self::exec::{BoxSendFuture, Exec}; #[cfg(feature = "client")]
#[cfg(any(feature = "http1", feature = "http2"))]
pub(crate) use self::lazy::{lazy, Started as Lazy}; pub(crate) use self::lazy::{lazy, Started as Lazy};
pub use self::never::Never; pub use self::never::Never;
pub(crate) use self::task::Poll; pub(crate) use self::task::Poll;

View File

@@ -88,15 +88,18 @@ pub(crate) enum User {
UnexpectedHeader, UnexpectedHeader,
/// User tried to create a Request with bad version. /// User tried to create a Request with bad version.
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
UnsupportedVersion, UnsupportedVersion,
/// User tried to create a CONNECT Request with the Client. /// User tried to create a CONNECT Request with the Client.
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
UnsupportedRequestMethod, UnsupportedRequestMethod,
/// User tried to respond with a 1xx (not 101) response code. /// User tried to respond with a 1xx (not 101) response code.
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
UnsupportedStatusCode, UnsupportedStatusCode,
/// User tried to send a Request with Client with non-absolute URI. /// User tried to send a Request with Client with non-absolute URI.
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
AbsoluteUriRequired, AbsoluteUriRequired,
/// User tried polling for an upgrade that doesn't exist. /// User tried polling for an upgrade that doesn't exist.
@@ -241,6 +244,7 @@ impl Error {
} }
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
pub(crate) fn new_connect<E: Into<Cause>>(cause: E) -> Error { pub(crate) fn new_connect<E: Into<Cause>>(cause: E) -> Error {
Error::new(Kind::Connect).with(cause) Error::new(Kind::Connect).with(cause)
} }
@@ -273,11 +277,13 @@ impl Error {
} }
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
pub(crate) fn new_user_unsupported_version() -> Error { pub(crate) fn new_user_unsupported_version() -> Error {
Error::new_user(User::UnsupportedVersion) Error::new_user(User::UnsupportedVersion)
} }
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
pub(crate) fn new_user_unsupported_request_method() -> Error { pub(crate) fn new_user_unsupported_request_method() -> Error {
Error::new_user(User::UnsupportedRequestMethod) Error::new_user(User::UnsupportedRequestMethod)
} }
@@ -288,6 +294,7 @@ impl Error {
} }
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
pub(crate) fn new_user_absolute_uri_required() -> Error { pub(crate) fn new_user_absolute_uri_required() -> Error {
Error::new_user(User::AbsoluteUriRequired) Error::new_user(User::AbsoluteUriRequired)
} }
@@ -371,14 +378,17 @@ impl Error {
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
Kind::User(User::UnexpectedHeader) => "user sent unexpected header", Kind::User(User::UnexpectedHeader) => "user sent unexpected header",
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
Kind::User(User::UnsupportedVersion) => "request has unsupported HTTP version", Kind::User(User::UnsupportedVersion) => "request has unsupported HTTP version",
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
Kind::User(User::UnsupportedRequestMethod) => "request has unsupported HTTP method", Kind::User(User::UnsupportedRequestMethod) => "request has unsupported HTTP method",
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
Kind::User(User::UnsupportedStatusCode) => { Kind::User(User::UnsupportedStatusCode) => {
"response has 1xx status code, not supported by server" "response has 1xx status code, not supported by server"
} }
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
Kind::User(User::AbsoluteUriRequired) => "client requires absolute-form URIs", Kind::User(User::AbsoluteUriRequired) => "client requires absolute-form URIs",
Kind::User(User::NoUpgrade) => "no upgrade available", Kind::User(User::NoUpgrade) => "no upgrade available",
#[cfg(feature = "http1")] #[cfg(feature = "http1")]

View File

@@ -3,7 +3,8 @@ use bytes::BytesMut;
use http::header::CONTENT_LENGTH; use http::header::CONTENT_LENGTH;
use http::header::{HeaderValue, ValueIter}; use http::header::{HeaderValue, ValueIter};
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
use http::method::Method; #[cfg(feature = "client")]
use http::Method;
use http::HeaderMap; use http::HeaderMap;
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
@@ -65,6 +66,7 @@ pub fn content_length_parse_all_values(values: ValueIter<'_, HeaderValue>) -> Op
} }
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
#[cfg(feature = "client")]
pub fn method_has_defined_payload_semantics(method: &Method) -> bool { pub fn method_has_defined_payload_semantics(method: &Method) -> bool {
match *method { match *method {
Method::GET | Method::HEAD | Method::DELETE | Method::CONNECT => false, Method::GET | Method::HEAD | Method::DELETE | Method::CONNECT => false,

View File

@@ -71,9 +71,14 @@ pub mod upgrade;
cfg_any_http! { cfg_any_http! {
mod headers; mod headers;
mod proto; mod proto;
pub mod client;
pub mod server; pub mod server;
pub use crate::client::Client;
pub use crate::server::Server; pub use crate::server::Server;
} }
cfg_feature! {
#![all(feature = "client", any(feature = "http1", feature = "http2"))]
pub mod client;
pub use crate::client::Client;
}

View File

@@ -1,5 +1,5 @@
use std::fmt; use std::fmt;
use std::io::{self}; use std::io;
use std::marker::PhantomData; use std::marker::PhantomData;
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
@@ -65,6 +65,7 @@ where
self.io.set_max_buf_size(max); self.io.set_max_buf_size(max);
} }
#[cfg(feature = "client")]
pub fn set_read_buf_exact_size(&mut self, sz: usize) { pub fn set_read_buf_exact_size(&mut self, sz: usize) {
self.io.set_read_buf_exact_size(sz); self.io.set_read_buf_exact_size(sz);
} }
@@ -77,6 +78,7 @@ where
self.io.set_write_strategy_queue(); self.io.set_write_strategy_queue();
} }
#[cfg(feature = "client")]
pub fn set_title_case_headers(&mut self) { pub fn set_title_case_headers(&mut self) {
self.state.title_case_headers = true; self.state.title_case_headers = true;
} }

View File

@@ -1,15 +1,14 @@
use std::error::Error as StdError; use std::error::Error as StdError;
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use http::{Request, Response, StatusCode}; use http::{Request, StatusCode};
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use super::{Http1Transaction, Wants}; use super::{Http1Transaction, Wants};
use crate::body::{Body, DecodedLength, HttpBody}; use crate::body::{Body, DecodedLength, HttpBody};
use crate::common::{task, Future, Never, Pin, Poll, Unpin}; use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::proto::{ use crate::proto::{
BodyLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, BodyLength, Conn, Dispatched, MessageHead, RequestHead,
ResponseHead,
}; };
use crate::service::HttpService; use crate::service::HttpService;
@@ -40,15 +39,17 @@ pub struct Server<S: HttpService<B>, B> {
pub(crate) service: S, pub(crate) service: S,
} }
#[pin_project::pin_project] cfg_client! {
pub struct Client<B> { #[pin_project::pin_project]
callback: Option<crate::client::dispatch::Callback<Request<B>, Response<Body>>>, pub struct Client<B> {
#[pin] callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
rx: ClientRx<B>, #[pin]
rx_closed: bool, rx: ClientRx<B>,
} rx_closed: bool,
}
type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>; type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
}
impl<D, Bs, I, T> Dispatcher<D, Bs, I, T> impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
where where
@@ -523,115 +524,117 @@ where
// ===== impl Client ===== // ===== impl Client =====
impl<B> Client<B> { cfg_client! {
pub fn new(rx: ClientRx<B>) -> Client<B> { impl<B> Client<B> {
Client { pub fn new(rx: ClientRx<B>) -> Client<B> {
callback: None, Client {
rx, callback: None,
rx_closed: false, rx,
} rx_closed: false,
}
}
impl<B> Dispatch for Client<B>
where
B: HttpBody,
{
type PollItem = RequestHead;
type PollBody = B;
type PollError = Never;
type RecvItem = ResponseHead;
fn poll_msg(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Never>>> {
let this = self.project();
debug_assert!(!*this.rx_closed);
match this.rx.poll_next(cx) {
Poll::Ready(Some((req, mut cb))) => {
// check that future hasn't been canceled already
match cb.poll_canceled(cx) {
Poll::Ready(()) => {
trace!("request canceled");
Poll::Ready(None)
}
Poll::Pending => {
let (parts, body) = req.into_parts();
let head = RequestHead {
version: parts.version,
subject: RequestLine(parts.method, parts.uri),
headers: parts.headers,
};
*this.callback = Some(cb);
Poll::Ready(Some(Ok((head, body))))
}
}
} }
Poll::Ready(None) => {
// user has dropped sender handle
trace!("client tx closed");
*this.rx_closed = true;
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
} }
} }
fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> { impl<B> Dispatch for Client<B>
match msg { where
Ok((msg, body)) => { B: HttpBody,
if let Some(cb) = self.callback.take() { {
let mut res = Response::new(body); type PollItem = RequestHead;
*res.status_mut() = msg.subject; type PollBody = B;
*res.headers_mut() = msg.headers; type PollError = crate::common::Never;
*res.version_mut() = msg.version; type RecvItem = crate::proto::ResponseHead;
cb.send(Ok(res));
Ok(()) fn poll_msg(
} else { self: Pin<&mut Self>,
// Getting here is likely a bug! An error should have happened cx: &mut task::Context<'_>,
// in Conn::require_empty_read() before ever parsing a ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), crate::common::Never>>> {
// full message! let this = self.project();
Err(crate::Error::new_unexpected_message()) debug_assert!(!*this.rx_closed);
match this.rx.poll_next(cx) {
Poll::Ready(Some((req, mut cb))) => {
// check that future hasn't been canceled already
match cb.poll_canceled(cx) {
Poll::Ready(()) => {
trace!("request canceled");
Poll::Ready(None)
}
Poll::Pending => {
let (parts, body) = req.into_parts();
let head = RequestHead {
version: parts.version,
subject: crate::proto::RequestLine(parts.method, parts.uri),
headers: parts.headers,
};
*this.callback = Some(cb);
Poll::Ready(Some(Ok((head, body))))
}
}
} }
Poll::Ready(None) => {
// user has dropped sender handle
trace!("client tx closed");
*this.rx_closed = true;
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
} }
Err(err) => { }
if let Some(cb) = self.callback.take() {
cb.send(Err((err, None))); fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
Ok(()) match msg {
} else if !self.rx_closed { Ok((msg, body)) => {
self.rx.close(); if let Some(cb) = self.callback.take() {
if let Some((req, cb)) = self.rx.try_recv() { let mut res = http::Response::new(body);
trace!("canceling queued request with connection error: {}", err); *res.status_mut() = msg.subject;
// in this case, the message was never even started, so it's safe to tell *res.headers_mut() = msg.headers;
// the user that the request was completely canceled *res.version_mut() = msg.version;
cb.send(Err((crate::Error::new_canceled().with(err), Some(req)))); cb.send(Ok(res));
Ok(()) Ok(())
} else {
// Getting here is likely a bug! An error should have happened
// in Conn::require_empty_read() before ever parsing a
// full message!
Err(crate::Error::new_unexpected_message())
}
}
Err(err) => {
if let Some(cb) = self.callback.take() {
cb.send(Err((err, None)));
Ok(())
} else if !self.rx_closed {
self.rx.close();
if let Some((req, cb)) = self.rx.try_recv() {
trace!("canceling queued request with connection error: {}", err);
// in this case, the message was never even started, so it's safe to tell
// the user that the request was completely canceled
cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
Ok(())
} else {
Err(err)
}
} else { } else {
Err(err) Err(err)
} }
} else {
Err(err)
} }
} }
} }
}
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> { fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
match self.callback { match self.callback {
Some(ref mut cb) => match cb.poll_canceled(cx) { Some(ref mut cb) => match cb.poll_canceled(cx) {
Poll::Ready(()) => { Poll::Ready(()) => {
trace!("callback receiver has dropped"); trace!("callback receiver has dropped");
Poll::Ready(Err(())) Poll::Ready(Err(()))
} }
Poll::Pending => Poll::Ready(Ok(())), Poll::Pending => Poll::Ready(Ok(())),
}, },
None => Poll::Ready(Err(())), None => Poll::Ready(Err(())),
}
} }
}
fn should_poll(&self) -> bool { fn should_poll(&self) -> bool {
self.callback.is_none() self.callback.is_none()
}
} }
} }

View File

@@ -84,6 +84,7 @@ where
self.write_buf.max_buf_size = max; self.write_buf.max_buf_size = max;
} }
#[cfg(feature = "client")]
pub fn set_read_buf_exact_size(&mut self, sz: usize) { pub fn set_read_buf_exact_size(&mut self, sz: usize) {
self.read_buf_strategy = ReadStrategy::Exact(sz); self.read_buf_strategy = ReadStrategy::Exact(sz);
} }
@@ -317,6 +318,7 @@ enum ReadStrategy {
next: usize, next: usize,
max: usize, max: usize,
}, },
#[cfg(feature = "client")]
Exact(usize), Exact(usize),
} }
@@ -332,6 +334,7 @@ impl ReadStrategy {
fn next(&self) -> usize { fn next(&self) -> usize {
match *self { match *self {
ReadStrategy::Adaptive { next, .. } => next, ReadStrategy::Adaptive { next, .. } => next,
#[cfg(feature = "client")]
ReadStrategy::Exact(exact) => exact, ReadStrategy::Exact(exact) => exact,
} }
} }
@@ -339,38 +342,42 @@ impl ReadStrategy {
fn max(&self) -> usize { fn max(&self) -> usize {
match *self { match *self {
ReadStrategy::Adaptive { max, .. } => max, ReadStrategy::Adaptive { max, .. } => max,
#[cfg(feature = "client")]
ReadStrategy::Exact(exact) => exact, ReadStrategy::Exact(exact) => exact,
} }
} }
fn record(&mut self, bytes_read: usize) { fn record(&mut self, bytes_read: usize) {
if let ReadStrategy::Adaptive { match *self {
ref mut decrease_now, ReadStrategy::Adaptive {
ref mut next, ref mut decrease_now,
max, ref mut next,
.. max,
} = *self ..
{ } => {
if bytes_read >= *next { if bytes_read >= *next {
*next = cmp::min(incr_power_of_two(*next), max); *next = cmp::min(incr_power_of_two(*next), max);
*decrease_now = false;
} else {
let decr_to = prev_power_of_two(*next);
if bytes_read < decr_to {
if *decrease_now {
*next = cmp::max(decr_to, INIT_BUFFER_SIZE);
*decrease_now = false;
} else {
// Decreasing is a two "record" process.
*decrease_now = true;
}
} else {
// A read within the current range should cancel
// a potential decrease, since we just saw proof
// that we still need this size.
*decrease_now = false; *decrease_now = false;
} else {
let decr_to = prev_power_of_two(*next);
if bytes_read < decr_to {
if *decrease_now {
*next = cmp::max(decr_to, INIT_BUFFER_SIZE);
*decrease_now = false;
} else {
// Decreasing is a two "record" process.
*decrease_now = true;
}
} else {
// A read within the current range should cancel
// a potential decrease, since we just saw proof
// that we still need this size.
*decrease_now = false;
}
} }
} },
#[cfg(feature = "client")]
ReadStrategy::Exact(_) => (),
} }
} }
} }

View File

@@ -19,7 +19,10 @@ mod io;
mod role; mod role;
pub(crate) type ServerTransaction = role::Server; pub(crate) type ServerTransaction = role::Server;
pub(crate) type ClientTransaction = role::Client;
cfg_client! {
pub(crate) type ClientTransaction = role::Client;
}
pub(crate) trait Http1Transaction { pub(crate) trait Http1Transaction {
type Incoming; type Incoming;

View File

@@ -10,7 +10,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use super::{decode_content_length, ping, PipeToSendStream, SendBuf}; use super::{decode_content_length, ping, PipeToSendStream, SendBuf};
use crate::body::HttpBody; use crate::body::HttpBody;
use crate::common::{task, Exec, Future, Never, Pin, Poll}; use crate::common::{task, exec::Exec, Future, Never, Pin, Poll};
use crate::headers; use crate::headers;
use crate::proto::Dispatched; use crate::proto::Dispatched;
use crate::{Body, Request, Response}; use crate::{Body, Request, Response};

View File

@@ -13,11 +13,14 @@ use crate::body::{DecodedLength, HttpBody};
use crate::common::{task, Future, Pin, Poll}; use crate::common::{task, Future, Pin, Poll};
use crate::headers::content_length_parse_all; use crate::headers::content_length_parse_all;
pub(crate) mod client;
pub(crate) mod ping; pub(crate) mod ping;
pub(crate) mod server; pub(crate) mod server;
pub(crate) use self::client::ClientTask; cfg_client! {
pub(crate) mod client;
pub(crate) use self::client::ClientTask;
}
pub(crate) use self::server::Server; pub(crate) use self::server::Server;
/// Default initial stream window size defined in HTTP2 spec. /// Default initial stream window size defined in HTTP2 spec.

View File

@@ -236,6 +236,7 @@ impl Recorder {
/// If the incoming stream is already closed, convert self into /// If the incoming stream is already closed, convert self into
/// a disabled reporter. /// a disabled reporter.
#[cfg(feature = "client")]
pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self { pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
if stream.is_end_stream() { if stream.is_end_stream() {
disabled() disabled()

View File

@@ -3,7 +3,10 @@
cfg_http1! { cfg_http1! {
pub(crate) mod h1; pub(crate) mod h1;
pub(crate) use self::h1::{dispatch, Conn, ServerTransaction}; pub(crate) use self::h1::{Conn, ServerTransaction};
#[cfg(feature = "client")]
pub(crate) use self::h1::dispatch;
} }
cfg_http2! { cfg_http2! {
@@ -31,6 +34,7 @@ pub struct RequestLine(pub http::Method, pub http::Uri);
/// An incoming response message. /// An incoming response message.
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
#[cfg(feature = "client")]
pub type ResponseHead = MessageHead<http::StatusCode>; pub type ResponseHead = MessageHead<http::StatusCode>;
#[derive(Debug)] #[derive(Debug)]

View File

@@ -40,13 +40,18 @@ pub use tower_service::Service;
mod http; mod http;
mod make; mod make;
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
mod oneshot; mod oneshot;
mod util; mod util;
pub(crate) use self::http::HttpService; pub(crate) use self::http::HttpService;
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
pub(crate) use self::make::{MakeConnection, MakeServiceRef}; #[cfg(feature = "client")]
pub(crate) use self::make::MakeConnection;
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
pub(crate) use self::make::MakeServiceRef;
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
pub(crate) use self::oneshot::{oneshot, Oneshot}; pub(crate) use self::oneshot::{oneshot, Oneshot};
pub use self::make::make_service_fn; pub use self::make::make_service_fn;