feat(client): enable CONNECT requests through the Client
While the upgrades feature enabled HTTP upgrades in both and the server and client, and the goal was for `CONNECT` requests to work as well, only the server could use them for `CONNECT`. The `Client` had some specific code rejecting `CONNECT` requests, and this removes it and prepares the `Client` to handle them correctly.
This commit is contained in:
@@ -22,7 +22,7 @@ include = [
|
|||||||
bytes = "0.4.4"
|
bytes = "0.4.4"
|
||||||
futures = "0.1.21"
|
futures = "0.1.21"
|
||||||
futures-cpupool = { version = "0.1.6", optional = true }
|
futures-cpupool = { version = "0.1.6", optional = true }
|
||||||
http = "0.1.5"
|
http = "0.1.7"
|
||||||
httparse = "1.0"
|
httparse = "1.0"
|
||||||
h2 = "0.1.5"
|
h2 = "0.1.5"
|
||||||
iovec = "0.1"
|
iovec = "0.1"
|
||||||
|
|||||||
@@ -78,7 +78,7 @@
|
|||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io;
|
use std::mem;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@@ -193,33 +193,44 @@ where C: Connect + Sync + 'static,
|
|||||||
|
|
||||||
/// Send a constructed Request using this Client.
|
/// Send a constructed Request using this Client.
|
||||||
pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
|
pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
|
||||||
match req.version() {
|
let is_http_11 = self.ver == Ver::Http1 && match req.version() {
|
||||||
Version::HTTP_10 |
|
Version::HTTP_11 => true,
|
||||||
Version::HTTP_11 => (),
|
Version::HTTP_10 => false,
|
||||||
other => {
|
other => {
|
||||||
error!("Request has unsupported version \"{:?}\"", other);
|
error!("Request has unsupported version \"{:?}\"", other);
|
||||||
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_version())));
|
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_version())));
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
if req.method() == &Method::CONNECT {
|
let is_http_connect = req.method() == &Method::CONNECT;
|
||||||
debug!("Client does not support CONNECT requests");
|
|
||||||
|
if !is_http_11 && is_http_connect {
|
||||||
|
debug!("client does not support CONNECT requests for {:?}", req.version());
|
||||||
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_request_method())));
|
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_request_method())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
let uri = req.uri().clone();
|
let uri = req.uri().clone();
|
||||||
let domain = match (uri.scheme_part(), uri.authority_part()) {
|
let domain = match (uri.scheme_part(), uri.authority_part()) {
|
||||||
(Some(scheme), Some(auth)) => {
|
(Some(scheme), Some(auth)) => {
|
||||||
format!("{}://{}", scheme, auth)
|
format!("{}://{}", scheme, auth)
|
||||||
}
|
}
|
||||||
|
(None, Some(auth)) if is_http_connect => {
|
||||||
|
let scheme = match auth.port() {
|
||||||
|
Some(443) => {
|
||||||
|
set_scheme(req.uri_mut(), Scheme::HTTPS);
|
||||||
|
"https"
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
set_scheme(req.uri_mut(), Scheme::HTTP);
|
||||||
|
"http"
|
||||||
|
},
|
||||||
|
};
|
||||||
|
format!("{}://{}", scheme, auth)
|
||||||
|
},
|
||||||
_ => {
|
_ => {
|
||||||
//TODO: replace this with a proper variant
|
debug!("Client requires absolute-form URIs, received: {:?}", uri);
|
||||||
return ResponseFuture::new(Box::new(future::err(::Error::new_io(
|
return ResponseFuture::new(Box::new(future::err(::Error::new_user_absolute_uri_required())))
|
||||||
io::Error::new(
|
|
||||||
io::ErrorKind::InvalidInput,
|
|
||||||
"invalid URI for Client Request"
|
|
||||||
)
|
|
||||||
))));
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -319,7 +330,6 @@ where C: Connect + Sync + 'static,
|
|||||||
//
|
//
|
||||||
// In both cases, we should just wait for the other future.
|
// In both cases, we should just wait for the other future.
|
||||||
if e.is_canceled() {
|
if e.is_canceled() {
|
||||||
//trace!("checkout/connect race canceled: {}", e);
|
|
||||||
Either::A(other.map_err(ClientError::Normal))
|
Either::A(other.map_err(ClientError::Normal))
|
||||||
} else {
|
} else {
|
||||||
Either::B(future::err(ClientError::Normal(e)))
|
Either::B(future::err(ClientError::Normal(e)))
|
||||||
@@ -330,8 +340,21 @@ where C: Connect + Sync + 'static,
|
|||||||
let resp = race.and_then(move |mut pooled| {
|
let resp = race.and_then(move |mut pooled| {
|
||||||
let conn_reused = pooled.is_reused();
|
let conn_reused = pooled.is_reused();
|
||||||
if ver == Ver::Http1 {
|
if ver == Ver::Http1 {
|
||||||
set_relative_uri(req.uri_mut(), pooled.is_proxied);
|
// CONNECT always sends origin-form, so check it first...
|
||||||
|
if req.method() == &Method::CONNECT {
|
||||||
|
authority_form(req.uri_mut());
|
||||||
|
} else if pooled.is_proxied {
|
||||||
|
absolute_form(req.uri_mut());
|
||||||
|
} else {
|
||||||
|
origin_form(req.uri_mut());
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
debug_assert!(
|
||||||
|
req.method() != &Method::CONNECT,
|
||||||
|
"Client should have returned Error for HTTP2 CONNECT"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let fut = pooled.send_request_retryable(req);
|
let fut = pooled.send_request_retryable(req);
|
||||||
|
|
||||||
// As of futures@0.1.21, there is a race condition in the mpsc
|
// As of futures@0.1.21, there is a race condition in the mpsc
|
||||||
@@ -612,10 +635,7 @@ enum Ver {
|
|||||||
Http2,
|
Http2,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_relative_uri(uri: &mut Uri, is_proxied: bool) {
|
fn origin_form(uri: &mut Uri) {
|
||||||
if is_proxied && uri.scheme_part() != Some(&Scheme::HTTPS) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let path = match uri.path_and_query() {
|
let path = match uri.path_and_query() {
|
||||||
Some(path) if path.as_str() != "/" => {
|
Some(path) if path.as_str() != "/" => {
|
||||||
let mut parts = ::http::uri::Parts::default();
|
let mut parts = ::http::uri::Parts::default();
|
||||||
@@ -623,10 +643,56 @@ fn set_relative_uri(uri: &mut Uri, is_proxied: bool) {
|
|||||||
Uri::from_parts(parts).expect("path is valid uri")
|
Uri::from_parts(parts).expect("path is valid uri")
|
||||||
},
|
},
|
||||||
_none_or_just_slash => {
|
_none_or_just_slash => {
|
||||||
"/".parse().expect("/ is valid path")
|
debug_assert!(Uri::default() == "/");
|
||||||
|
Uri::default()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
*uri = path;
|
*uri = path
|
||||||
|
}
|
||||||
|
|
||||||
|
fn absolute_form(uri: &mut Uri) {
|
||||||
|
debug_assert!(uri.scheme_part().is_some(), "absolute_form needs a scheme");
|
||||||
|
debug_assert!(uri.authority_part().is_some(), "absolute_form needs an authority");
|
||||||
|
// If the URI is to HTTPS, and the connector claimed to be a proxy,
|
||||||
|
// then it *should* have tunneled, and so we don't want to send
|
||||||
|
// absolute-form in that case.
|
||||||
|
if uri.scheme_part() == Some(&Scheme::HTTPS) {
|
||||||
|
origin_form(uri);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn authority_form(uri: &mut Uri) {
|
||||||
|
if log_enabled!(::log::Level::Warn) {
|
||||||
|
if let Some(path) = uri.path_and_query() {
|
||||||
|
// `https://hyper.rs` would parse with `/` path, don't
|
||||||
|
// annoy people about that...
|
||||||
|
if path != "/" {
|
||||||
|
warn!(
|
||||||
|
"HTTP/1.1 CONNECT request stripping path: {:?}",
|
||||||
|
path
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*uri = match uri.authority_part() {
|
||||||
|
Some(auth) => {
|
||||||
|
let mut parts = ::http::uri::Parts::default();
|
||||||
|
parts.authority = Some(auth.clone());
|
||||||
|
Uri::from_parts(parts).expect("authority is valid")
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
unreachable!("authority_form with relative uri");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_scheme(uri: &mut Uri, scheme: Scheme) {
|
||||||
|
debug_assert!(uri.scheme_part().is_none(), "set_scheme expects no existing scheme");
|
||||||
|
let old = mem::replace(uri, Uri::default());
|
||||||
|
let mut parts: ::http::uri::Parts = old.into();
|
||||||
|
parts.scheme = Some(scheme);
|
||||||
|
parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
|
||||||
|
*uri = Uri::from_parts(parts).expect("scheme is valid");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Builder for a Client
|
/// Builder for a Client
|
||||||
@@ -818,8 +884,43 @@ mod unit_tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn set_relative_uri_with_implicit_path() {
|
fn set_relative_uri_with_implicit_path() {
|
||||||
let mut uri = "http://hyper.rs".parse().unwrap();
|
let mut uri = "http://hyper.rs".parse().unwrap();
|
||||||
set_relative_uri(&mut uri, false);
|
origin_form(&mut uri);
|
||||||
|
|
||||||
assert_eq!(uri.to_string(), "/");
|
assert_eq!(uri.to_string(), "/");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_origin_form() {
|
||||||
|
let mut uri = "http://hyper.rs/guides".parse().unwrap();
|
||||||
|
origin_form(&mut uri);
|
||||||
|
assert_eq!(uri.to_string(), "/guides");
|
||||||
|
|
||||||
|
let mut uri = "http://hyper.rs/guides?foo=bar".parse().unwrap();
|
||||||
|
origin_form(&mut uri);
|
||||||
|
assert_eq!(uri.to_string(), "/guides?foo=bar");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_absolute_form() {
|
||||||
|
let mut uri = "http://hyper.rs/guides".parse().unwrap();
|
||||||
|
absolute_form(&mut uri);
|
||||||
|
assert_eq!(uri.to_string(), "http://hyper.rs/guides");
|
||||||
|
|
||||||
|
let mut uri = "https://hyper.rs/guides".parse().unwrap();
|
||||||
|
absolute_form(&mut uri);
|
||||||
|
assert_eq!(uri.to_string(), "/guides");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_authority_form() {
|
||||||
|
extern crate pretty_env_logger;
|
||||||
|
let _ = pretty_env_logger::try_init();
|
||||||
|
|
||||||
|
let mut uri = "http://hyper.rs".parse().unwrap();
|
||||||
|
authority_form(&mut uri);
|
||||||
|
assert_eq!(uri.to_string(), "hyper.rs");
|
||||||
|
|
||||||
|
let mut uri = "hyper.rs".parse().unwrap();
|
||||||
|
authority_form(&mut uri);
|
||||||
|
assert_eq!(uri.to_string(), "hyper.rs");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,6 +61,8 @@ pub(crate) enum Kind {
|
|||||||
UnsupportedVersion,
|
UnsupportedVersion,
|
||||||
/// User tried to create a CONNECT Request with the Client.
|
/// User tried to create a CONNECT Request with the Client.
|
||||||
UnsupportedRequestMethod,
|
UnsupportedRequestMethod,
|
||||||
|
/// User tried to send a Request with Client with non-absolute URI.
|
||||||
|
AbsoluteUriRequired,
|
||||||
|
|
||||||
/// User tried polling for an upgrade that doesn't exist.
|
/// User tried polling for an upgrade that doesn't exist.
|
||||||
NoUpgrade,
|
NoUpgrade,
|
||||||
@@ -117,6 +119,7 @@ impl Error {
|
|||||||
Kind::Closed |
|
Kind::Closed |
|
||||||
Kind::UnsupportedVersion |
|
Kind::UnsupportedVersion |
|
||||||
Kind::UnsupportedRequestMethod |
|
Kind::UnsupportedRequestMethod |
|
||||||
|
Kind::AbsoluteUriRequired |
|
||||||
Kind::NoUpgrade |
|
Kind::NoUpgrade |
|
||||||
Kind::Execute => true,
|
Kind::Execute => true,
|
||||||
_ => false,
|
_ => false,
|
||||||
@@ -224,6 +227,10 @@ impl Error {
|
|||||||
Error::new(Kind::UnsupportedRequestMethod, None)
|
Error::new(Kind::UnsupportedRequestMethod, None)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn new_user_absolute_uri_required() -> Error {
|
||||||
|
Error::new(Kind::AbsoluteUriRequired, None)
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn new_user_no_upgrade() -> Error {
|
pub(crate) fn new_user_no_upgrade() -> Error {
|
||||||
Error::new(Kind::NoUpgrade, None)
|
Error::new(Kind::NoUpgrade, None)
|
||||||
}
|
}
|
||||||
@@ -303,6 +310,7 @@ impl StdError for Error {
|
|||||||
Kind::Http2 => "http2 general error",
|
Kind::Http2 => "http2 general error",
|
||||||
Kind::UnsupportedVersion => "request has unsupported HTTP version",
|
Kind::UnsupportedVersion => "request has unsupported HTTP version",
|
||||||
Kind::UnsupportedRequestMethod => "request has unsupported HTTP method",
|
Kind::UnsupportedRequestMethod => "request has unsupported HTTP method",
|
||||||
|
Kind::AbsoluteUriRequired => "client requires absolute-form URIs",
|
||||||
Kind::NoUpgrade => "no upgrade available",
|
Kind::NoUpgrade => "no upgrade available",
|
||||||
Kind::ManualUpgrade => "upgrade expected but low level API in use",
|
Kind::ManualUpgrade => "upgrade expected but low level API in use",
|
||||||
Kind::Execute => "executor failed to spawn task",
|
Kind::Execute => "executor failed to spawn task",
|
||||||
|
|||||||
121
tests/client.rs
121
tests/client.rs
@@ -425,32 +425,6 @@ test! {
|
|||||||
body: None,
|
body: None,
|
||||||
}
|
}
|
||||||
|
|
||||||
/*TODO: when new Connect trait allows stating connection is proxied
|
|
||||||
test! {
|
|
||||||
name: client_http_proxy,
|
|
||||||
|
|
||||||
server:
|
|
||||||
expected: "\
|
|
||||||
GET http://{addr}/proxy HTTP/1.1\r\n\
|
|
||||||
host: {addr}\r\n\
|
|
||||||
\r\n\
|
|
||||||
",
|
|
||||||
reply: REPLY_OK,
|
|
||||||
|
|
||||||
client:
|
|
||||||
proxy: true,
|
|
||||||
request:
|
|
||||||
method: GET,
|
|
||||||
url: "http://{addr}/proxy",
|
|
||||||
headers: {},
|
|
||||||
body: None,
|
|
||||||
response:
|
|
||||||
status: OK,
|
|
||||||
headers: {},
|
|
||||||
body: None,
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
test! {
|
test! {
|
||||||
name: client_head_ignores_body,
|
name: client_head_ignores_body,
|
||||||
|
|
||||||
@@ -509,6 +483,21 @@ test! {
|
|||||||
body: None,
|
body: None,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test! {
|
||||||
|
name: client_requires_absolute_uri,
|
||||||
|
|
||||||
|
server:
|
||||||
|
expected: "won't get here {addr}",
|
||||||
|
reply: "won't reply",
|
||||||
|
|
||||||
|
client:
|
||||||
|
request:
|
||||||
|
method: GET,
|
||||||
|
url: "/relative-{addr}",
|
||||||
|
headers: {},
|
||||||
|
body: None,
|
||||||
|
error: |err| err.to_string() == "client requires absolute-form URIs",
|
||||||
|
}
|
||||||
|
|
||||||
test! {
|
test! {
|
||||||
name: client_error_unexpected_eof,
|
name: client_error_unexpected_eof,
|
||||||
@@ -596,19 +585,51 @@ test! {
|
|||||||
server:
|
server:
|
||||||
expected: "\
|
expected: "\
|
||||||
CONNECT {addr} HTTP/1.1\r\n\
|
CONNECT {addr} HTTP/1.1\r\n\
|
||||||
Host: {addr}\r\n\
|
host: {addr}\r\n\
|
||||||
|
\r\n\
|
||||||
|
",
|
||||||
|
reply: "\
|
||||||
|
HTTP/1.1 200 OK\r\n\
|
||||||
\r\n\
|
\r\n\
|
||||||
",
|
",
|
||||||
// won't ever get to reply
|
|
||||||
reply: "",
|
|
||||||
|
|
||||||
client:
|
client:
|
||||||
request:
|
request:
|
||||||
method: CONNECT,
|
method: CONNECT,
|
||||||
url: "http://{addr}/",
|
url: "{addr}",
|
||||||
|
headers: {},
|
||||||
|
body: None,
|
||||||
|
response:
|
||||||
|
status: OK,
|
||||||
|
headers: {},
|
||||||
|
body: None,
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
test! {
|
||||||
|
name: client_connect_method_with_absolute_uri,
|
||||||
|
|
||||||
|
server:
|
||||||
|
expected: "\
|
||||||
|
CONNECT {addr} HTTP/1.1\r\n\
|
||||||
|
host: {addr}\r\n\
|
||||||
|
\r\n\
|
||||||
|
",
|
||||||
|
reply: "\
|
||||||
|
HTTP/1.1 200 OK\r\n\
|
||||||
|
\r\n\
|
||||||
|
",
|
||||||
|
|
||||||
|
client:
|
||||||
|
request:
|
||||||
|
method: CONNECT,
|
||||||
|
url: "http://{addr}",
|
||||||
|
headers: {},
|
||||||
|
body: None,
|
||||||
|
response:
|
||||||
|
status: OK,
|
||||||
headers: {},
|
headers: {},
|
||||||
body: None,
|
body: None,
|
||||||
error: |err| err.is_user(),
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1222,6 +1243,44 @@ mod dispatch_impl {
|
|||||||
rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
|
rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn connect_proxy_http_connect_sends_authority_form() {
|
||||||
|
let _ = pretty_env_logger::try_init();
|
||||||
|
let server = TcpListener::bind("127.0.0.1:0").unwrap();
|
||||||
|
let addr = server.local_addr().unwrap();
|
||||||
|
let mut rt = Runtime::new().unwrap();
|
||||||
|
let connector = DebugConnector::new()
|
||||||
|
.proxy();
|
||||||
|
|
||||||
|
let client = Client::builder()
|
||||||
|
.build(connector);
|
||||||
|
|
||||||
|
let (tx1, rx1) = oneshot::channel();
|
||||||
|
thread::spawn(move || {
|
||||||
|
let mut sock = server.accept().unwrap().0;
|
||||||
|
//drop(server);
|
||||||
|
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
|
||||||
|
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
|
||||||
|
let mut buf = [0; 4096];
|
||||||
|
let n = sock.read(&mut buf).expect("read 1");
|
||||||
|
let expected = format!("CONNECT {addr} HTTP/1.1\r\nhost: {addr}\r\n\r\n", addr=addr);
|
||||||
|
assert_eq!(s(&buf[..n]), expected);
|
||||||
|
|
||||||
|
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 1");
|
||||||
|
let _ = tx1.send(());
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
let rx = rx1.expect("thread panicked");
|
||||||
|
let req = Request::builder()
|
||||||
|
.method("CONNECT")
|
||||||
|
.uri(&*format!("http://{}/useless/path", addr))
|
||||||
|
.body(Body::empty())
|
||||||
|
.unwrap();
|
||||||
|
let res = client.request(req);
|
||||||
|
rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn client_upgrade() {
|
fn client_upgrade() {
|
||||||
use tokio_io::io::{read_to_end, write_all};
|
use tokio_io::io::{read_to_end, write_all};
|
||||||
|
|||||||
Reference in New Issue
Block a user