feat(client): add ALPN h2 support for client connectors
- Adds `Connected::negotiated_h2()` method to signal the connection must use HTTP2. `Connect` implementations should set this if using ALPN. If a connection to a host is detected to have been upgraded via ALPN, any other oustanding connect futures will be canceled, and the waiting requests will make use of the single HTTP2 connection. The `http2_only` builder configuration still works the same, not requiring ALPN at all, and always using only a single connection.
This commit is contained in:
@@ -466,6 +466,7 @@ impl Builder {
|
|||||||
T: AsyncRead + AsyncWrite + Send + 'static,
|
T: AsyncRead + AsyncWrite + Send + 'static,
|
||||||
B: Payload + 'static,
|
B: Payload + 'static,
|
||||||
{
|
{
|
||||||
|
trace!("client handshake HTTP/{}", if self.http2 { 2 } else { 1 });
|
||||||
Handshake {
|
Handshake {
|
||||||
builder: self.clone(),
|
builder: self.clone(),
|
||||||
io: Some(io),
|
io: Some(io),
|
||||||
|
|||||||
@@ -36,7 +36,6 @@ pub trait Connect: Send + Sync {
|
|||||||
/// A set of properties to describe where and how to try to connect.
|
/// A set of properties to describe where and how to try to connect.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Destination {
|
pub struct Destination {
|
||||||
//pub(super) alpn: Alpn,
|
|
||||||
pub(super) uri: Uri,
|
pub(super) uri: Uri,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -46,21 +45,18 @@ pub struct Destination {
|
|||||||
/// was used, or if connected to an HTTP proxy.
|
/// was used, or if connected to an HTTP proxy.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Connected {
|
pub struct Connected {
|
||||||
//alpn: Alpn,
|
pub(super) alpn: Alpn,
|
||||||
pub(super) is_proxied: bool,
|
pub(super) is_proxied: bool,
|
||||||
pub(super) extra: Option<Extra>,
|
pub(super) extra: Option<Extra>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) struct Extra(Box<ExtraInner>);
|
pub(super) struct Extra(Box<ExtraInner>);
|
||||||
|
|
||||||
/*TODO: when HTTP1 Upgrades to H2 are added, this will be needed
|
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||||
#[derive(Debug)]
|
|
||||||
pub(super) enum Alpn {
|
pub(super) enum Alpn {
|
||||||
Http1,
|
H2,
|
||||||
//H2,
|
None,
|
||||||
//Http1OrH2
|
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
impl Destination {
|
impl Destination {
|
||||||
/// Get the protocol scheme.
|
/// Get the protocol scheme.
|
||||||
@@ -246,7 +242,7 @@ impl Connected {
|
|||||||
/// Create new `Connected` type with empty metadata.
|
/// Create new `Connected` type with empty metadata.
|
||||||
pub fn new() -> Connected {
|
pub fn new() -> Connected {
|
||||||
Connected {
|
Connected {
|
||||||
//alpn: Alpn::Http1,
|
alpn: Alpn::None,
|
||||||
is_proxied: false,
|
is_proxied: false,
|
||||||
extra: None,
|
extra: None,
|
||||||
}
|
}
|
||||||
@@ -274,19 +270,18 @@ impl Connected {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
/// Set that the connected transport negotiated HTTP/2 as it's
|
/// Set that the connected transport negotiated HTTP/2 as it's
|
||||||
/// next protocol.
|
/// next protocol.
|
||||||
pub fn h2(mut self) -> Connected {
|
pub fn negotiated_h2(mut self) -> Connected {
|
||||||
self.alpn = Alpn::H2;
|
self.alpn = Alpn::H2;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
// Don't public expose that `Connected` is `Clone`, unsure if we want to
|
// Don't public expose that `Connected` is `Clone`, unsure if we want to
|
||||||
// keep that contract...
|
// keep that contract...
|
||||||
pub(super) fn clone(&self) -> Connected {
|
pub(super) fn clone(&self) -> Connected {
|
||||||
Connected {
|
Connected {
|
||||||
|
alpn: self.alpn.clone(),
|
||||||
is_proxied: self.is_proxied,
|
is_proxied: self.is_proxied,
|
||||||
extra: self.extra.clone(),
|
extra: self.extra.clone(),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -86,12 +86,12 @@ use futures::{Async, Future, Poll};
|
|||||||
use futures::future::{self, Either, Executor};
|
use futures::future::{self, Either, Executor};
|
||||||
use futures::sync::oneshot;
|
use futures::sync::oneshot;
|
||||||
use http::{Method, Request, Response, Uri, Version};
|
use http::{Method, Request, Response, Uri, Version};
|
||||||
use http::header::{Entry, HeaderValue, HOST};
|
use http::header::{HeaderValue, HOST};
|
||||||
use http::uri::Scheme;
|
use http::uri::Scheme;
|
||||||
|
|
||||||
use body::{Body, Payload};
|
use body::{Body, Payload};
|
||||||
use common::{Exec, lazy as hyper_lazy, Lazy};
|
use common::{Exec, lazy as hyper_lazy, Lazy};
|
||||||
use self::connect::{Connect, Connected, Destination};
|
use self::connect::{Alpn, Connect, Connected, Destination};
|
||||||
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};
|
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};
|
||||||
|
|
||||||
#[cfg(feature = "runtime")] pub use self::connect::HttpConnector;
|
#[cfg(feature = "runtime")] pub use self::connect::HttpConnector;
|
||||||
@@ -192,23 +192,19 @@ where C: Connect + Sync + 'static,
|
|||||||
|
|
||||||
/// Send a constructed Request using this Client.
|
/// Send a constructed Request using this Client.
|
||||||
pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
|
pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
|
||||||
let is_http_11 = self.ver == Ver::Http1 && match req.version() {
|
let is_http_connect = req.method() == &Method::CONNECT;
|
||||||
Version::HTTP_11 => true,
|
match req.version() {
|
||||||
Version::HTTP_10 => false,
|
Version::HTTP_11 => (),
|
||||||
other => {
|
Version::HTTP_10 => if is_http_connect {
|
||||||
|
debug!("CONNECT is not allowed for HTTP/1.0");
|
||||||
|
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_request_method())));
|
||||||
|
},
|
||||||
|
other => if self.ver != Ver::Http2 {
|
||||||
error!("Request has unsupported version \"{:?}\"", other);
|
error!("Request has unsupported version \"{:?}\"", other);
|
||||||
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_version())));
|
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_version())));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let is_http_connect = req.method() == &Method::CONNECT;
|
|
||||||
|
|
||||||
if !is_http_11 && is_http_connect {
|
|
||||||
debug!("client does not support CONNECT requests for {:?}", req.version());
|
|
||||||
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_request_method())));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
let uri = req.uri().clone();
|
let uri = req.uri().clone();
|
||||||
let domain = match (uri.scheme_part(), uri.authority_part()) {
|
let domain = match (uri.scheme_part(), uri.authority_part()) {
|
||||||
(Some(scheme), Some(auth)) => {
|
(Some(scheme), Some(auth)) => {
|
||||||
@@ -233,21 +229,7 @@ where C: Connect + Sync + 'static,
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if self.set_host && self.ver == Ver::Http1 {
|
let pool_key = Arc::new(domain.to_string());
|
||||||
if let Entry::Vacant(entry) = req.headers_mut().entry(HOST).expect("HOST is always valid header name") {
|
|
||||||
let hostname = uri.host().expect("authority implies host");
|
|
||||||
let host = if let Some(port) = uri.port() {
|
|
||||||
let s = format!("{}:{}", hostname, port);
|
|
||||||
HeaderValue::from_str(&s)
|
|
||||||
} else {
|
|
||||||
HeaderValue::from_str(hostname)
|
|
||||||
}.expect("uri host is valid header value");
|
|
||||||
entry.insert(host);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
let pool_key = (Arc::new(domain.to_string()), self.ver);
|
|
||||||
ResponseFuture::new(Box::new(self.retryably_send_request(req, pool_key)))
|
ResponseFuture::new(Box::new(self.retryably_send_request(req, pool_key)))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -283,11 +265,28 @@ where C: Connect + Sync + 'static,
|
|||||||
fn send_request(&self, mut req: Request<B>, pool_key: PoolKey) -> impl Future<Item=Response<Body>, Error=ClientError<B>> {
|
fn send_request(&self, mut req: Request<B>, pool_key: PoolKey) -> impl Future<Item=Response<Body>, Error=ClientError<B>> {
|
||||||
let conn = self.connection_for(req.uri().clone(), pool_key);
|
let conn = self.connection_for(req.uri().clone(), pool_key);
|
||||||
|
|
||||||
let ver = self.ver;
|
let set_host = self.set_host;
|
||||||
let executor = self.executor.clone();
|
let executor = self.executor.clone();
|
||||||
conn.and_then(move |mut pooled| {
|
conn.and_then(move |mut pooled| {
|
||||||
if ver == Ver::Http1 {
|
if pooled.is_http1() {
|
||||||
// CONNECT always sends origin-form, so check it first...
|
if set_host {
|
||||||
|
let uri = req.uri().clone();
|
||||||
|
req
|
||||||
|
.headers_mut()
|
||||||
|
.entry(HOST)
|
||||||
|
.expect("HOST is always valid header name")
|
||||||
|
.or_insert_with(|| {
|
||||||
|
let hostname = uri.host().expect("authority implies host");
|
||||||
|
if let Some(port) = uri.port() {
|
||||||
|
let s = format!("{}:{}", hostname, port);
|
||||||
|
HeaderValue::from_str(&s)
|
||||||
|
} else {
|
||||||
|
HeaderValue::from_str(hostname)
|
||||||
|
}.expect("uri host is valid header value")
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// CONNECT always sends authority-form, so check it first...
|
||||||
if req.method() == &Method::CONNECT {
|
if req.method() == &Method::CONNECT {
|
||||||
authority_form(req.uri_mut());
|
authority_form(req.uri_mut());
|
||||||
} else if pooled.conn_info.is_proxied {
|
} else if pooled.conn_info.is_proxied {
|
||||||
@@ -295,11 +294,9 @@ where C: Connect + Sync + 'static,
|
|||||||
} else {
|
} else {
|
||||||
origin_form(req.uri_mut());
|
origin_form(req.uri_mut());
|
||||||
};
|
};
|
||||||
} else {
|
} else if req.method() == &Method::CONNECT {
|
||||||
debug_assert!(
|
debug!("client does not support CONNECT requests over HTTP2");
|
||||||
req.method() != &Method::CONNECT,
|
return Either::A(future::err(ClientError::Normal(::Error::new_user_unsupported_request_method())));
|
||||||
"Client should have returned Error for HTTP2 CONNECT"
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let fut = pooled.send_request_retryable(req)
|
let fut = pooled.send_request_retryable(req)
|
||||||
@@ -322,10 +319,10 @@ where C: Connect + Sync + 'static,
|
|||||||
// To counteract this, we must check if our senders 'want' channel
|
// To counteract this, we must check if our senders 'want' channel
|
||||||
// has been closed after having tried to send. If so, error out...
|
// has been closed after having tried to send. If so, error out...
|
||||||
if pooled.is_closed() {
|
if pooled.is_closed() {
|
||||||
return Either::A(fut);
|
return Either::B(Either::A(fut));
|
||||||
}
|
}
|
||||||
|
|
||||||
Either::B(fut
|
Either::B(Either::B(fut
|
||||||
.and_then(move |mut res| {
|
.and_then(move |mut res| {
|
||||||
// If pooled is HTTP/2, we can toss this reference immediately.
|
// If pooled is HTTP/2, we can toss this reference immediately.
|
||||||
//
|
//
|
||||||
@@ -337,7 +334,7 @@ where C: Connect + Sync + 'static,
|
|||||||
// for a new request to start.
|
// for a new request to start.
|
||||||
//
|
//
|
||||||
// It won't be ready if there is a body to stream.
|
// It won't be ready if there is a body to stream.
|
||||||
if ver == Ver::Http2 || !pooled.is_pool_enabled() || pooled.is_ready() {
|
if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
|
||||||
drop(pooled);
|
drop(pooled);
|
||||||
} else if !res.body().is_end_stream() {
|
} else if !res.body().is_end_stream() {
|
||||||
let (delayed_tx, delayed_rx) = oneshot::channel();
|
let (delayed_tx, delayed_rx) = oneshot::channel();
|
||||||
@@ -370,7 +367,7 @@ where C: Connect + Sync + 'static,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}))
|
})))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -463,8 +460,9 @@ where C: Connect + Sync + 'static,
|
|||||||
let pool = self.pool.clone();
|
let pool = self.pool.clone();
|
||||||
let h1_writev = self.h1_writev;
|
let h1_writev = self.h1_writev;
|
||||||
let h1_title_case_headers = self.h1_title_case_headers;
|
let h1_title_case_headers = self.h1_title_case_headers;
|
||||||
|
let ver = self.ver;
|
||||||
|
let is_ver_h2 = self.ver == Ver::Http2;
|
||||||
let connector = self.connector.clone();
|
let connector = self.connector.clone();
|
||||||
let ver = pool_key.1;
|
|
||||||
let dst = Destination {
|
let dst = Destination {
|
||||||
uri,
|
uri,
|
||||||
};
|
};
|
||||||
@@ -474,7 +472,7 @@ where C: Connect + Sync + 'static,
|
|||||||
// If the pool_key is for HTTP/2, and there is already a
|
// If the pool_key is for HTTP/2, and there is already a
|
||||||
// connection being estabalished, then this can't take a
|
// connection being estabalished, then this can't take a
|
||||||
// second lock. The "connect_to" future is Canceled.
|
// second lock. The "connect_to" future is Canceled.
|
||||||
let connecting = match pool.connecting(&pool_key) {
|
let connecting = match pool.connecting(&pool_key, ver) {
|
||||||
Some(lock) => lock,
|
Some(lock) => lock,
|
||||||
None => {
|
None => {
|
||||||
let canceled = ::Error::new_canceled(Some("HTTP/2 connection in progress"));
|
let canceled = ::Error::new_canceled(Some("HTTP/2 connection in progress"));
|
||||||
@@ -484,11 +482,31 @@ where C: Connect + Sync + 'static,
|
|||||||
Either::A(connector.connect(dst)
|
Either::A(connector.connect(dst)
|
||||||
.map_err(::Error::new_connect)
|
.map_err(::Error::new_connect)
|
||||||
.and_then(move |(io, connected)| {
|
.and_then(move |(io, connected)| {
|
||||||
conn::Builder::new()
|
// If ALPN is h2 and we aren't http2_only already,
|
||||||
|
// then we need to convert our pool checkout into
|
||||||
|
// a single HTTP2 one.
|
||||||
|
let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
|
||||||
|
match connecting.alpn_h2(&pool) {
|
||||||
|
Some(lock) => {
|
||||||
|
trace!("ALPN negotiated h2, updating pool");
|
||||||
|
lock
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
// Another connection has already upgraded,
|
||||||
|
// the pool checkout should finish up for us.
|
||||||
|
let canceled = ::Error::new_canceled(Some("ALPN upgraded to HTTP/2"));
|
||||||
|
return Either::B(future::err(canceled));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
connecting
|
||||||
|
};
|
||||||
|
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
|
||||||
|
Either::A(conn::Builder::new()
|
||||||
.exec(executor.clone())
|
.exec(executor.clone())
|
||||||
.h1_writev(h1_writev)
|
.h1_writev(h1_writev)
|
||||||
.h1_title_case_headers(h1_title_case_headers)
|
.h1_title_case_headers(h1_title_case_headers)
|
||||||
.http2_only(pool_key.1 == Ver::Http2)
|
.http2_only(is_h2)
|
||||||
.handshake(io)
|
.handshake(io)
|
||||||
.and_then(move |(tx, conn)| {
|
.and_then(move |(tx, conn)| {
|
||||||
let bg = executor.execute(conn.map_err(|e| {
|
let bg = executor.execute(conn.map_err(|e| {
|
||||||
@@ -509,12 +527,13 @@ where C: Connect + Sync + 'static,
|
|||||||
.map(move |tx| {
|
.map(move |tx| {
|
||||||
pool.pooled(connecting, PoolClient {
|
pool.pooled(connecting, PoolClient {
|
||||||
conn_info: connected,
|
conn_info: connected,
|
||||||
tx: match ver {
|
tx: if is_h2 {
|
||||||
Ver::Http1 => PoolTx::Http1(tx),
|
PoolTx::Http2(tx.into_http2())
|
||||||
Ver::Http2 => PoolTx::Http2(tx.into_http2()),
|
} else {
|
||||||
|
PoolTx::Http1(tx)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
})
|
}))
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -591,6 +610,17 @@ impl<B> PoolClient<B> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_http1(&self) -> bool {
|
||||||
|
!self.is_http2()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_http2(&self) -> bool {
|
||||||
|
match self.tx {
|
||||||
|
PoolTx::Http1(_) => false,
|
||||||
|
PoolTx::Http2(_) => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn is_ready(&self) -> bool {
|
fn is_ready(&self) -> bool {
|
||||||
match self.tx {
|
match self.tx {
|
||||||
PoolTx::Http1(ref tx) => tx.is_ready(),
|
PoolTx::Http1(ref tx) => tx.is_ready(),
|
||||||
@@ -650,6 +680,10 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn can_share(&self) -> bool {
|
||||||
|
self.is_http2()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: allow() required due to `impl Trait` leaking types to this lint
|
// FIXME: allow() required due to `impl Trait` leaking types to this lint
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ pub(super) trait Poolable: Send + Sized + 'static {
|
|||||||
///
|
///
|
||||||
/// Allows for HTTP/2 to return a shared reservation.
|
/// Allows for HTTP/2 to return a shared reservation.
|
||||||
fn reserve(self) -> Reservation<Self>;
|
fn reserve(self) -> Reservation<Self>;
|
||||||
|
fn can_share(&self) -> bool;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// When checking out a pooled connection, it might be that the connection
|
/// When checking out a pooled connection, it might be that the connection
|
||||||
@@ -50,7 +51,7 @@ pub(super) enum Reservation<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Simple type alias in case the key type needs to be adjusted.
|
/// Simple type alias in case the key type needs to be adjusted.
|
||||||
pub(super) type Key = (Arc<String>, Ver);
|
pub(super) type Key = Arc<String>;
|
||||||
|
|
||||||
struct PoolInner<T> {
|
struct PoolInner<T> {
|
||||||
// A flag that a connection is being estabilished, and the connection
|
// A flag that a connection is being estabilished, and the connection
|
||||||
@@ -151,8 +152,8 @@ impl<T: Poolable> Pool<T> {
|
|||||||
|
|
||||||
/// Ensure that there is only ever 1 connecting task for HTTP/2
|
/// Ensure that there is only ever 1 connecting task for HTTP/2
|
||||||
/// connections. This does nothing for HTTP/1.
|
/// connections. This does nothing for HTTP/1.
|
||||||
pub(super) fn connecting(&self, key: &Key) -> Option<Connecting<T>> {
|
pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
|
||||||
if key.1 == Ver::Http2 {
|
if ver == Ver::Http2 {
|
||||||
if let Some(ref enabled) = self.inner {
|
if let Some(ref enabled) = self.inner {
|
||||||
let mut inner = enabled.lock().unwrap();
|
let mut inner = enabled.lock().unwrap();
|
||||||
return if inner.connecting.insert(key.clone()) {
|
return if inner.connecting.insert(key.clone()) {
|
||||||
@@ -162,7 +163,7 @@ impl<T: Poolable> Pool<T> {
|
|||||||
};
|
};
|
||||||
Some(connecting)
|
Some(connecting)
|
||||||
} else {
|
} else {
|
||||||
trace!("HTTP/2 connecting already in progress for {:?}", key.0);
|
trace!("HTTP/2 connecting already in progress for {:?}", key);
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -190,7 +191,7 @@ impl<T: Poolable> Pool<T> {
|
|||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(super) fn h1_key(&self, s: &str) -> Key {
|
pub(super) fn h1_key(&self, s: &str) -> Key {
|
||||||
(Arc::new(s.to_string()), Ver::Http1)
|
Arc::new(s.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
@@ -243,11 +244,6 @@ impl<T: Poolable> Pool<T> {
|
|||||||
let (value, pool_ref) = if let Some(ref enabled) = self.inner {
|
let (value, pool_ref) = if let Some(ref enabled) = self.inner {
|
||||||
match value.reserve() {
|
match value.reserve() {
|
||||||
Reservation::Shared(to_insert, to_return) => {
|
Reservation::Shared(to_insert, to_return) => {
|
||||||
debug_assert_eq!(
|
|
||||||
connecting.key.1,
|
|
||||||
Ver::Http2,
|
|
||||||
"shared reservation without Http2"
|
|
||||||
);
|
|
||||||
let mut inner = enabled.lock().unwrap();
|
let mut inner = enabled.lock().unwrap();
|
||||||
inner.put(connecting.key.clone(), to_insert, enabled);
|
inner.put(connecting.key.clone(), to_insert, enabled);
|
||||||
// Do this here instead of Drop for Connecting because we
|
// Do this here instead of Drop for Connecting because we
|
||||||
@@ -294,7 +290,7 @@ impl<T: Poolable> Pool<T> {
|
|||||||
// unique or shared. So, the hack is to just assume Ver::Http2 means
|
// unique or shared. So, the hack is to just assume Ver::Http2 means
|
||||||
// shared... :(
|
// shared... :(
|
||||||
let mut pool_ref = WeakOpt::none();
|
let mut pool_ref = WeakOpt::none();
|
||||||
if key.1 == Ver::Http1 {
|
if !value.can_share() {
|
||||||
if let Some(ref enabled) = self.inner {
|
if let Some(ref enabled) = self.inner {
|
||||||
pool_ref = WeakOpt::downgrade(enabled);
|
pool_ref = WeakOpt::downgrade(enabled);
|
||||||
}
|
}
|
||||||
@@ -377,7 +373,7 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
|
|||||||
|
|
||||||
impl<T: Poolable> PoolInner<T> {
|
impl<T: Poolable> PoolInner<T> {
|
||||||
fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
|
fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
|
||||||
if key.1 == Ver::Http2 && self.idle.contains_key(&key) {
|
if value.can_share() && self.idle.contains_key(&key) {
|
||||||
trace!("put; existing idle HTTP/2 connection for {:?}", key);
|
trace!("put; existing idle HTTP/2 connection for {:?}", key);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -601,7 +597,7 @@ impl<T: Poolable> Drop for Pooled<T> {
|
|||||||
if let Ok(mut inner) = pool.lock() {
|
if let Ok(mut inner) = pool.lock() {
|
||||||
inner.put(self.key.clone(), value, &pool);
|
inner.put(self.key.clone(), value, &pool);
|
||||||
}
|
}
|
||||||
} else if self.key.1 == Ver::Http1 {
|
} else if !value.can_share() {
|
||||||
trace!("pool dropped, dropping pooled ({:?})", self.key);
|
trace!("pool dropped, dropping pooled ({:?})", self.key);
|
||||||
}
|
}
|
||||||
// Ver::Http2 is already in the Pool (or dead), so we wouldn't
|
// Ver::Http2 is already in the Pool (or dead), so we wouldn't
|
||||||
@@ -705,16 +701,22 @@ pub(super) struct Connecting<T: Poolable> {
|
|||||||
pool: WeakOpt<Mutex<PoolInner<T>>>,
|
pool: WeakOpt<Mutex<PoolInner<T>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T: Poolable> Connecting<T> {
|
||||||
|
pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
|
||||||
|
debug_assert!(
|
||||||
|
self.pool.0.is_none(),
|
||||||
|
"Connecting::alpn_h2 but already Http2"
|
||||||
|
);
|
||||||
|
|
||||||
|
pool.connecting(&self.key, Ver::Http2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<T: Poolable> Drop for Connecting<T> {
|
impl<T: Poolable> Drop for Connecting<T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Some(pool) = self.pool.upgrade() {
|
if let Some(pool) = self.pool.upgrade() {
|
||||||
// No need to panic on drop, that could abort!
|
// No need to panic on drop, that could abort!
|
||||||
if let Ok(mut inner) = pool.lock() {
|
if let Ok(mut inner) = pool.lock() {
|
||||||
debug_assert_eq!(
|
|
||||||
self.key.1,
|
|
||||||
Ver::Http2,
|
|
||||||
"Connecting constructed without Http2"
|
|
||||||
);
|
|
||||||
inner.connected(&self.key);
|
inner.connected(&self.key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -804,7 +806,7 @@ mod tests {
|
|||||||
use futures::{Async, Future};
|
use futures::{Async, Future};
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use common::Exec;
|
use common::Exec;
|
||||||
use super::{Connecting, Key, Poolable, Pool, Reservation, Ver, WeakOpt};
|
use super::{Connecting, Key, Poolable, Pool, Reservation, WeakOpt};
|
||||||
|
|
||||||
/// Test unique reservations.
|
/// Test unique reservations.
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
@@ -818,6 +820,10 @@ mod tests {
|
|||||||
fn reserve(self) -> Reservation<Self> {
|
fn reserve(self) -> Reservation<Self> {
|
||||||
Reservation::Unique(self)
|
Reservation::Unique(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn can_share(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn c<T: Poolable>(key: Key) -> Connecting<T> {
|
fn c<T: Poolable>(key: Key) -> Connecting<T> {
|
||||||
@@ -845,7 +851,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_pool_checkout_smoke() {
|
fn test_pool_checkout_smoke() {
|
||||||
let pool = pool_no_timer();
|
let pool = pool_no_timer();
|
||||||
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
let key = Arc::new("foo".to_string());
|
||||||
let pooled = pool.pooled(c(key.clone()), Uniq(41));
|
let pooled = pool.pooled(c(key.clone()), Uniq(41));
|
||||||
|
|
||||||
drop(pooled);
|
drop(pooled);
|
||||||
@@ -860,7 +866,7 @@ mod tests {
|
|||||||
fn test_pool_checkout_returns_none_if_expired() {
|
fn test_pool_checkout_returns_none_if_expired() {
|
||||||
future::lazy(|| {
|
future::lazy(|| {
|
||||||
let pool = pool_no_timer();
|
let pool = pool_no_timer();
|
||||||
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
let key = Arc::new("foo".to_string());
|
||||||
let pooled = pool.pooled(c(key.clone()), Uniq(41));
|
let pooled = pool.pooled(c(key.clone()), Uniq(41));
|
||||||
drop(pooled);
|
drop(pooled);
|
||||||
::std::thread::sleep(pool.locked().timeout.unwrap());
|
::std::thread::sleep(pool.locked().timeout.unwrap());
|
||||||
@@ -873,7 +879,7 @@ mod tests {
|
|||||||
fn test_pool_checkout_removes_expired() {
|
fn test_pool_checkout_removes_expired() {
|
||||||
future::lazy(|| {
|
future::lazy(|| {
|
||||||
let pool = pool_no_timer();
|
let pool = pool_no_timer();
|
||||||
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
let key = Arc::new("foo".to_string());
|
||||||
|
|
||||||
pool.pooled(c(key.clone()), Uniq(41));
|
pool.pooled(c(key.clone()), Uniq(41));
|
||||||
pool.pooled(c(key.clone()), Uniq(5));
|
pool.pooled(c(key.clone()), Uniq(5));
|
||||||
@@ -894,7 +900,7 @@ mod tests {
|
|||||||
fn test_pool_max_idle_per_host() {
|
fn test_pool_max_idle_per_host() {
|
||||||
future::lazy(|| {
|
future::lazy(|| {
|
||||||
let pool = pool_max_idle_no_timer(2);
|
let pool = pool_max_idle_no_timer(2);
|
||||||
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
let key = Arc::new("foo".to_string());
|
||||||
|
|
||||||
pool.pooled(c(key.clone()), Uniq(41));
|
pool.pooled(c(key.clone()), Uniq(41));
|
||||||
pool.pooled(c(key.clone()), Uniq(5));
|
pool.pooled(c(key.clone()), Uniq(5));
|
||||||
@@ -920,7 +926,7 @@ mod tests {
|
|||||||
&Exec::Default,
|
&Exec::Default,
|
||||||
);
|
);
|
||||||
|
|
||||||
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
let key = Arc::new("foo".to_string());
|
||||||
|
|
||||||
// Since pool.pooled() will be calling spawn on drop, need to be sure
|
// Since pool.pooled() will be calling spawn on drop, need to be sure
|
||||||
// those drops are called while `rt` is the current executor. To do so,
|
// those drops are called while `rt` is the current executor. To do so,
|
||||||
@@ -945,7 +951,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_pool_checkout_task_unparked() {
|
fn test_pool_checkout_task_unparked() {
|
||||||
let pool = pool_no_timer();
|
let pool = pool_no_timer();
|
||||||
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
let key = Arc::new("foo".to_string());
|
||||||
let pooled = pool.pooled(c(key.clone()), Uniq(41));
|
let pooled = pool.pooled(c(key.clone()), Uniq(41));
|
||||||
|
|
||||||
let checkout = pool.checkout(key).join(future::lazy(move || {
|
let checkout = pool.checkout(key).join(future::lazy(move || {
|
||||||
@@ -964,7 +970,7 @@ mod tests {
|
|||||||
fn test_pool_checkout_drop_cleans_up_waiters() {
|
fn test_pool_checkout_drop_cleans_up_waiters() {
|
||||||
future::lazy(|| {
|
future::lazy(|| {
|
||||||
let pool = pool_no_timer::<Uniq<i32>>();
|
let pool = pool_no_timer::<Uniq<i32>>();
|
||||||
let key = (Arc::new("localhost:12345".to_string()), Ver::Http1);
|
let key = Arc::new("localhost:12345".to_string());
|
||||||
|
|
||||||
let mut checkout1 = pool.checkout(key.clone());
|
let mut checkout1 = pool.checkout(key.clone());
|
||||||
let mut checkout2 = pool.checkout(key.clone());
|
let mut checkout2 = pool.checkout(key.clone());
|
||||||
@@ -1000,12 +1006,16 @@ mod tests {
|
|||||||
fn reserve(self) -> Reservation<Self> {
|
fn reserve(self) -> Reservation<Self> {
|
||||||
Reservation::Unique(self)
|
Reservation::Unique(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn can_share(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn pooled_drop_if_closed_doesnt_reinsert() {
|
fn pooled_drop_if_closed_doesnt_reinsert() {
|
||||||
let pool = pool_no_timer();
|
let pool = pool_no_timer();
|
||||||
let key = (Arc::new("localhost:12345".to_string()), Ver::Http1);
|
let key = Arc::new("localhost:12345".to_string());
|
||||||
pool.pooled(c(key.clone()), CanClose {
|
pool.pooled(c(key.clone()), CanClose {
|
||||||
val: 57,
|
val: 57,
|
||||||
closed: true,
|
closed: true,
|
||||||
|
|||||||
@@ -207,4 +207,3 @@ fn checkout_win_allows_connect_future_to_be_pooled() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
15
src/mock.rs
15
src/mock.rs
@@ -358,11 +358,12 @@ impl Read for Duplex {
|
|||||||
impl Write for Duplex {
|
impl Write for Duplex {
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
|
let ret = inner.write.write(buf);
|
||||||
if let Some(task) = inner.handle_read_task.take() {
|
if let Some(task) = inner.handle_read_task.take() {
|
||||||
trace!("waking DuplexHandle read");
|
trace!("waking DuplexHandle read");
|
||||||
task.notify();
|
task.notify();
|
||||||
}
|
}
|
||||||
inner.write.write(buf)
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
@@ -404,8 +405,7 @@ impl DuplexHandle {
|
|||||||
inner.handle_read_task = Some(task::current());
|
inner.handle_read_task = Some(task::current());
|
||||||
return Ok(Async::NotReady);
|
return Ok(Async::NotReady);
|
||||||
}
|
}
|
||||||
inner.write.inner.vec.truncate(0);
|
inner.write.read(buf).map(Async::Ready)
|
||||||
Ok(Async::Ready(inner.write.inner.len()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write(&self, bytes: &[u8]) -> Poll<usize, io::Error> {
|
pub fn write(&self, bytes: &[u8]) -> Poll<usize, io::Error> {
|
||||||
@@ -456,6 +456,13 @@ impl MockConnector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn mock_fut<F>(&mut self, key: &str, fut: F) -> DuplexHandle
|
pub fn mock_fut<F>(&mut self, key: &str, fut: F) -> DuplexHandle
|
||||||
|
where
|
||||||
|
F: Future + Send + 'static,
|
||||||
|
{
|
||||||
|
self.mock_opts(key, Connected::new(), fut)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn mock_opts<F>(&mut self, key: &str, connected: Connected, fut: F) -> DuplexHandle
|
||||||
where
|
where
|
||||||
F: Future + Send + 'static,
|
F: Future + Send + 'static,
|
||||||
{
|
{
|
||||||
@@ -465,7 +472,7 @@ impl MockConnector {
|
|||||||
|
|
||||||
let fut = Box::new(fut.then(move |_| {
|
let fut = Box::new(fut.then(move |_| {
|
||||||
trace!("MockConnector mocked fut ready");
|
trace!("MockConnector mocked fut ready");
|
||||||
Ok((duplex, Connected::new()))
|
Ok((duplex, connected))
|
||||||
}));
|
}));
|
||||||
self.mocks.lock().unwrap().entry(key)
|
self.mocks.lock().unwrap().entry(key)
|
||||||
.or_insert(Vec::new())
|
.or_insert(Vec::new())
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use hyper::{Body, Client, Method, Request, StatusCode};
|
|||||||
use futures::{Future, Stream};
|
use futures::{Future, Stream};
|
||||||
use futures::sync::oneshot;
|
use futures::sync::oneshot;
|
||||||
use tokio::runtime::current_thread::Runtime;
|
use tokio::runtime::current_thread::Runtime;
|
||||||
use tokio::net::tcp::{ConnectFuture, TcpStream};
|
use tokio::net::tcp::{ConnectFuture, TcpListener as TkTcpListener, TcpStream};
|
||||||
|
|
||||||
fn s(buf: &[u8]) -> &str {
|
fn s(buf: &[u8]) -> &str {
|
||||||
::std::str::from_utf8(buf).expect("from_utf8")
|
::std::str::from_utf8(buf).expect("from_utf8")
|
||||||
@@ -1349,12 +1349,66 @@ mod dispatch_impl {
|
|||||||
assert_eq!(vec, b"bar=foo");
|
assert_eq!(vec, b"bar=foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn alpn_h2() {
|
||||||
|
use hyper::Response;
|
||||||
|
use hyper::server::conn::Http;
|
||||||
|
use hyper::service::service_fn_ok;
|
||||||
|
|
||||||
|
let _ = pretty_env_logger::try_init();
|
||||||
|
let mut rt = Runtime::new().unwrap();
|
||||||
|
let listener = TkTcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
let mut connector = DebugConnector::new();
|
||||||
|
connector.alpn_h2 = true;
|
||||||
|
let connects = connector.connects.clone();
|
||||||
|
|
||||||
|
let client = Client::builder()
|
||||||
|
.build::<_, ::hyper::Body>(connector);
|
||||||
|
|
||||||
|
let srv = listener.incoming()
|
||||||
|
.into_future()
|
||||||
|
.map_err(|_| unreachable!())
|
||||||
|
.and_then(|(item, _incoming)| {
|
||||||
|
let socket = item.unwrap();
|
||||||
|
Http::new()
|
||||||
|
.http2_only(true)
|
||||||
|
.serve_connection(socket, service_fn_ok(|req| {
|
||||||
|
assert_eq!(req.headers().get("host"), None);
|
||||||
|
Response::new(Body::empty())
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
.map_err(|e| panic!("server error: {}", e));
|
||||||
|
rt.spawn(srv);
|
||||||
|
|
||||||
|
|
||||||
|
assert_eq!(connects.load(Ordering::SeqCst), 0);
|
||||||
|
|
||||||
|
let url = format!("http://{}/a", addr).parse::<::hyper::Uri>().unwrap();
|
||||||
|
let res1 = client.get(url.clone());
|
||||||
|
let res2 = client.get(url.clone());
|
||||||
|
let res3 = client.get(url.clone());
|
||||||
|
rt.block_on(res1.join(res2).join(res3)).unwrap();
|
||||||
|
|
||||||
|
// Since the client doesn't know it can ALPN at first, it will have
|
||||||
|
// started 3 connections. But, the server above will only handle 1,
|
||||||
|
// so the unwrapped responses futures show it still worked.
|
||||||
|
assert_eq!(connects.load(Ordering::SeqCst), 3);
|
||||||
|
|
||||||
|
let res4 = client.get(url.clone());
|
||||||
|
rt.block_on(res4).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(connects.load(Ordering::SeqCst), 3, "after ALPN, no more connects");
|
||||||
|
drop(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
struct DebugConnector {
|
struct DebugConnector {
|
||||||
http: HttpConnector,
|
http: HttpConnector,
|
||||||
closes: mpsc::Sender<()>,
|
closes: mpsc::Sender<()>,
|
||||||
connects: Arc<AtomicUsize>,
|
connects: Arc<AtomicUsize>,
|
||||||
is_proxy: bool,
|
is_proxy: bool,
|
||||||
|
alpn_h2: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DebugConnector {
|
impl DebugConnector {
|
||||||
@@ -1370,6 +1424,7 @@ mod dispatch_impl {
|
|||||||
closes: closes,
|
closes: closes,
|
||||||
connects: Arc::new(AtomicUsize::new(0)),
|
connects: Arc::new(AtomicUsize::new(0)),
|
||||||
is_proxy: false,
|
is_proxy: false,
|
||||||
|
alpn_h2: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1388,7 +1443,11 @@ mod dispatch_impl {
|
|||||||
self.connects.fetch_add(1, Ordering::SeqCst);
|
self.connects.fetch_add(1, Ordering::SeqCst);
|
||||||
let closes = self.closes.clone();
|
let closes = self.closes.clone();
|
||||||
let is_proxy = self.is_proxy;
|
let is_proxy = self.is_proxy;
|
||||||
Box::new(self.http.connect(dst).map(move |(s, c)| {
|
let is_alpn_h2 = self.alpn_h2;
|
||||||
|
Box::new(self.http.connect(dst).map(move |(s, mut c)| {
|
||||||
|
if is_alpn_h2 {
|
||||||
|
c = c.negotiated_h2();
|
||||||
|
}
|
||||||
(DebugStream(s, closes), c.proxy(is_proxy))
|
(DebugStream(s, closes), c.proxy(is_proxy))
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user