feat(http1): Add higher-level HTTP upgrade support to Client and Server (#1563)

- Adds `Body::on_upgrade()` that returns an `OnUpgrade` future.
- Adds `hyper::upgrade` module containing types for dealing with
  upgrades.
- Adds `server::conn::Connection::with_upgrades()` method to enable
  these upgrades when using lower-level API (because of a missing
  `Send` bound on the transport generic).
- Client connections are automatically enabled.
- Optimizes request parsing, to make up for extra work to look for
  upgrade requests.
  - Returns a smaller `DecodedLength` type instead of the fatter
    `Decoder`, which should also allow a couple fewer branches.
  - Removes the `Decode::Ignore` wrapper enum, and instead ignoring
    1xx responses is handled directly in the response parsing code.

Ref #1563 

Closes #1395
This commit is contained in:
Sean McArthur
2018-06-14 13:39:29 -07:00
committed by GitHub
parent 1c3fbfd6bf
commit fea29b29e2
26 changed files with 1271 additions and 574 deletions

View File

@@ -8,9 +8,9 @@ use http::{HeaderMap, Method, Version};
use tokio_io::{AsyncRead, AsyncWrite};
use ::Chunk;
use proto::{BodyLength, MessageHead};
use proto::{BodyLength, DecodedLength, MessageHead};
use super::io::{Buffered};
use super::{EncodedBuf, Encode, Encoder, Decode, Decoder, Http1Transaction, ParseContext};
use super::{EncodedBuf, Encode, Encoder, /*Decode,*/ Decoder, Http1Transaction, ParseContext};
const H2_PREFACE: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
@@ -44,6 +44,7 @@ where I: AsyncRead + AsyncWrite,
notify_read: false,
reading: Reading::Init,
writing: Writing::Init,
upgrade: None,
// We assume a modern world where the remote speaks HTTP/1.1.
// If they tell us otherwise, we'll downgrade in `read_head`.
version: Version::HTTP_11,
@@ -72,6 +73,10 @@ where I: AsyncRead + AsyncWrite,
self.io.into_inner()
}
pub fn pending_upgrade(&mut self) -> Option<::upgrade::Pending> {
self.state.upgrade.take()
}
pub fn is_read_closed(&self) -> bool {
self.state.is_read_closed()
}
@@ -114,80 +119,61 @@ where I: AsyncRead + AsyncWrite,
read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
}
pub fn read_head(&mut self) -> Poll<Option<(MessageHead<T::Incoming>, Option<BodyLength>)>, ::Error> {
pub fn read_head(&mut self) -> Poll<Option<(MessageHead<T::Incoming>, DecodedLength, bool)>, ::Error> {
debug_assert!(self.can_read_head());
trace!("Conn::read_head");
loop {
let msg = match self.io.parse::<T>(ParseContext {
cached_headers: &mut self.state.cached_headers,
req_method: &mut self.state.method,
}) {
Ok(Async::Ready(msg)) => msg,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
// If we are currently waiting on a message, then an empty
// message should be reported as an error. If not, it is just
// the connection closing gracefully.
let must_error = self.should_error_on_eof();
self.state.close_read();
self.io.consume_leading_lines();
let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
return if was_mid_parse || must_error {
// We check if the buf contains the h2 Preface
debug!("parse error ({}) with {} bytes", e, self.io.read_buf().len());
self.on_parse_error(e)
.map(|()| Async::NotReady)
} else {
debug!("read eof");
Ok(Async::Ready(None))
};
}
};
let msg = match self.io.parse::<T>(ParseContext {
cached_headers: &mut self.state.cached_headers,
req_method: &mut self.state.method,
}) {
Ok(Async::Ready(msg)) => msg,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => return self.on_read_head_error(e),
};
self.state.version = msg.head.version;
let head = msg.head;
let decoder = match msg.decode {
Decode::Normal(d) => {
d
},
Decode::Final(d) => {
trace!("final decoder, HTTP ending");
debug_assert!(d.is_eof());
self.state.close_read();
d
},
Decode::Ignore => {
// likely a 1xx message that we can ignore
continue;
}
};
debug!("incoming body is {}", decoder);
// Note: don't deconstruct `msg` into local variables, it appears
// the optimizer doesn't remove the extra copies.
self.state.busy();
debug!("incoming body is {}", msg.decode);
self.state.busy();
self.state.keep_alive &= msg.keep_alive;
self.state.version = msg.head.version;
if msg.decode == DecodedLength::ZERO {
debug_assert!(!msg.expect_continue, "expect-continue needs a body");
self.state.reading = Reading::KeepAlive;
if !T::should_read_first() {
self.try_keep_alive();
}
} else {
if msg.expect_continue {
let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
self.io.headers_buf().extend_from_slice(cont);
}
let wants_keep_alive = msg.keep_alive;
self.state.keep_alive &= wants_keep_alive;
self.state.reading = Reading::Body(Decoder::new(msg.decode));
};
let content_length = decoder.content_length();
Ok(Async::Ready(Some((msg.head, msg.decode, msg.wants_upgrade))))
}
if let Reading::Closed = self.state.reading {
// actually want an `if not let ...`
} else {
self.state.reading = if content_length.is_none() {
Reading::KeepAlive
} else {
Reading::Body(decoder)
};
}
if content_length.is_none() {
self.try_keep_alive();
}
return Ok(Async::Ready(Some((head, content_length))));
fn on_read_head_error<Z>(&mut self, e: ::Error) -> Poll<Option<Z>, ::Error> {
// If we are currently waiting on a message, then an empty
// message should be reported as an error. If not, it is just
// the connection closing gracefully.
let must_error = self.should_error_on_eof();
self.state.close_read();
self.io.consume_leading_lines();
let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
if was_mid_parse || must_error {
// We check if the buf contains the h2 Preface
debug!("parse error ({}) with {} bytes", e, self.io.read_buf().len());
self.on_parse_error(e)
.map(|()| Async::NotReady)
} else {
debug!("read eof");
Ok(Async::Ready(None))
}
}
@@ -612,6 +598,10 @@ where I: AsyncRead + AsyncWrite,
}
}
pub(super) fn on_upgrade(&mut self) -> ::upgrade::OnUpgrade {
self.state.prepare_upgrade()
}
// Used in h1::dispatch tests
#[cfg(test)]
pub(super) fn io_mut(&mut self) -> &mut I {
@@ -649,6 +639,8 @@ struct State {
reading: Reading,
/// State of allowed writes
writing: Writing,
/// An expected pending HTTP upgrade.
upgrade: Option<::upgrade::Pending>,
/// Either HTTP/1.0 or 1.1 connection
version: Version,
}
@@ -697,6 +689,7 @@ impl fmt::Debug for Writing {
impl ::std::ops::BitAndAssign<bool> for KA {
fn bitand_assign(&mut self, enabled: bool) {
if !enabled {
trace!("remote disabling keep-alive");
*self = KA::Disabled;
}
}
@@ -821,11 +814,53 @@ impl State {
_ => false
}
}
fn prepare_upgrade(&mut self) -> ::upgrade::OnUpgrade {
trace!("prepare possible HTTP upgrade");
debug_assert!(self.upgrade.is_none());
let (tx, rx) = ::upgrade::pending();
self.upgrade = Some(tx);
rx
}
}
#[cfg(test)]
//TODO: rewrite these using dispatch
mod tests {
#[cfg(feature = "nightly")]
#[bench]
fn bench_read_head_short(b: &mut ::test::Bencher) {
use super::*;
let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
let len = s.len();
b.bytes = len as u64;
let mut io = ::mock::AsyncIo::new_buf(Vec::new(), 0);
io.panic();
let mut conn = Conn::<_, ::Chunk, ::proto::h1::ServerTransaction>::new(io);
*conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
b.iter(|| {
match conn.read_head().unwrap() {
Async::Ready(Some(x)) => {
::test::black_box(&x);
let mut headers = x.0.headers;
headers.clear();
conn.state.cached_headers = Some(headers);
},
f => panic!("expected Ready(Some(..)): {:?}", f)
}
conn.io.read_buf_mut().reserve(1);
unsafe {
conn.io.read_buf_mut().set_len(len);
}
conn.state.reading = Reading::Init;
});
}
/*
use futures::{Async, Future, Stream, Sink};
use futures::future;

View File

@@ -7,7 +7,7 @@ use futures::{Async, Poll};
use bytes::Bytes;
use super::io::MemRead;
use super::BodyLength;
use super::{DecodedLength};
use self::Kind::{Length, Chunked, Eof};
@@ -74,6 +74,14 @@ impl Decoder {
Decoder { kind: Kind::Eof(false) }
}
pub(super) fn new(len: DecodedLength) -> Self {
match len {
DecodedLength::CHUNKED => Decoder::chunked(),
DecodedLength::CLOSE_DELIMITED => Decoder::eof(),
length => Decoder::length(length.danger_len()),
}
}
// methods
pub fn is_eof(&self) -> bool {
@@ -85,16 +93,6 @@ impl Decoder {
}
}
pub fn content_length(&self) -> Option<BodyLength> {
match self.kind {
Length(0) |
Chunked(ChunkedState::End, _) |
Eof(true) => None,
Length(len) => Some(BodyLength::Known(len)),
_ => Some(BodyLength::Unknown),
}
}
pub fn decode<R: MemRead>(&mut self, body: &mut R) -> Poll<Bytes, io::Error> {
trace!("decode; state={:?}", self.kind);
match self.kind {
@@ -152,16 +150,6 @@ impl fmt::Debug for Decoder {
}
}
impl fmt::Display for Decoder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.kind {
Kind::Length(n) => write!(f, "content-length ({} bytes)", n),
Kind::Chunked(..) => f.write_str("chunked encoded"),
Kind::Eof(..) => f.write_str("until end-of-file"),
}
}
}
macro_rules! byte (
($rdr:ident) => ({
let buf = try_ready!($rdr.read_mem(1));

View File

@@ -5,7 +5,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use body::{Body, Payload};
use body::internal::FullDataArg;
use proto::{BodyLength, Conn, MessageHead, RequestHead, RequestLine, ResponseHead};
use proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead};
use super::Http1Transaction;
use service::Service;
@@ -65,32 +65,34 @@ where
(io, buf, self.dispatch)
}
/// The "Future" poll function. Runs this dispatcher until the
/// connection is shutdown, or an error occurs.
pub fn poll_until_shutdown(&mut self) -> Poll<(), ::Error> {
self.poll_catch(true)
}
/// Run this dispatcher until HTTP says this connection is done,
/// but don't call `AsyncWrite::shutdown` on the underlying IO.
///
/// This is useful for HTTP upgrades.
/// This is useful for old-style HTTP upgrades, but ignores
/// newer-style upgrade API.
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
self.poll_catch(false)
.map(|x| {
x.map(|ds| if let Dispatched::Upgrade(pending) = ds {
pending.manual();
})
})
}
fn poll_catch(&mut self, should_shutdown: bool) -> Poll<(), ::Error> {
fn poll_catch(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error> {
self.poll_inner(should_shutdown).or_else(|e| {
// An error means we're shutting down either way.
// We just try to give the error to the user,
// and close the connection with an Ok. If we
// cannot give it to the user, then return the Err.
self.dispatch.recv_msg(Err(e)).map(Async::Ready)
self.dispatch.recv_msg(Err(e))?;
Ok(Async::Ready(Dispatched::Shutdown))
})
}
fn poll_inner(&mut self, should_shutdown: bool) -> Poll<(), ::Error> {
fn poll_inner(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error> {
T::update_date();
loop {
self.poll_read()?;
self.poll_write()?;
@@ -110,11 +112,14 @@ where
}
if self.is_done() {
if should_shutdown {
if let Some(pending) = self.conn.pending_upgrade() {
self.conn.take_error()?;
return Ok(Async::Ready(Dispatched::Upgrade(pending)));
} else if should_shutdown {
try_ready!(self.conn.shutdown().map_err(::Error::new_shutdown));
}
self.conn.take_error()?;
Ok(Async::Ready(()))
Ok(Async::Ready(Dispatched::Shutdown))
} else {
Ok(Async::NotReady)
}
@@ -190,20 +195,18 @@ where
}
// dispatch is ready for a message, try to read one
match self.conn.read_head() {
Ok(Async::Ready(Some((head, body_len)))) => {
let body = if let Some(body_len) = body_len {
let (mut tx, rx) =
Body::new_channel(if let BodyLength::Known(len) = body_len {
Some(len)
} else {
None
});
let _ = tx.poll_ready(); // register this task if rx is dropped
self.body_tx = Some(tx);
rx
} else {
Body::empty()
Ok(Async::Ready(Some((head, body_len, wants_upgrade)))) => {
let mut body = match body_len {
DecodedLength::ZERO => Body::empty(),
other => {
let (tx, rx) = Body::new_channel(other.into_opt());
self.body_tx = Some(tx);
rx
},
};
if wants_upgrade {
body.set_on_upgrade(self.conn.on_upgrade());
}
self.dispatch.recv_msg(Ok((head, body)))?;
Ok(Async::Ready(()))
}
@@ -326,7 +329,6 @@ where
}
}
impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
where
D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>,
@@ -334,12 +336,12 @@ where
T: Http1Transaction,
Bs: Payload,
{
type Item = ();
type Item = Dispatched;
type Error = ::Error;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll_until_shutdown()
self.poll_catch(true)
}
}
@@ -519,7 +521,7 @@ mod tests {
use super::*;
use mock::AsyncIo;
use proto::ClientTransaction;
use proto::h1::ClientTransaction;
#[test]
fn client_read_bytes_before_writing_request() {

View File

@@ -93,6 +93,12 @@ where
self.read_buf.as_ref()
}
#[cfg(test)]
#[cfg(feature = "nightly")]
pub(super) fn read_buf_mut(&mut self) -> &mut BytesMut {
&mut self.read_buf
}
pub fn headers_buf(&mut self) -> &mut Vec<u8> {
let buf = self.write_buf.headers_mut();
&mut buf.bytes
@@ -595,7 +601,7 @@ mod tests {
cached_headers: &mut None,
req_method: &mut None,
};
assert!(buffered.parse::<::proto::ClientTransaction>(ctx).unwrap().is_not_ready());
assert!(buffered.parse::<::proto::h1::ClientTransaction>(ctx).unwrap().is_not_ready());
assert!(buffered.io.blocked());
}

View File

@@ -1,7 +1,7 @@
use bytes::BytesMut;
use http::{HeaderMap, Method};
use proto::{MessageHead, BodyLength};
use proto::{MessageHead, BodyLength, DecodedLength};
pub(crate) use self::conn::Conn;
pub(crate) use self::dispatch::Dispatcher;
@@ -19,12 +19,8 @@ mod io;
mod role;
pub(crate) type ServerTransaction = self::role::Server<self::role::YesUpgrades>;
//pub type ServerTransaction = self::role::Server<self::role::NoUpgrades>;
//pub type ServerUpgradeTransaction = self::role::Server<self::role::YesUpgrades>;
pub(crate) type ClientTransaction = self::role::Client<self::role::NoUpgrades>;
pub(crate) type ClientUpgradeTransaction = self::role::Client<self::role::YesUpgrades>;
pub(crate) type ServerTransaction = role::Server;
pub(crate) type ClientTransaction = role::Client;
pub(crate) trait Http1Transaction {
type Incoming;
@@ -40,14 +36,16 @@ pub(crate) trait Http1Transaction {
fn update_date() {}
}
/// Result newtype for Http1Transaction::parse.
pub(crate) type ParseResult<T> = Result<Option<ParsedMessage<T>>, ::error::Parse>;
#[derive(Debug)]
pub(crate) struct ParsedMessage<T> {
head: MessageHead<T>,
decode: Decode,
decode: DecodedLength,
expect_continue: bool,
keep_alive: bool,
wants_upgrade: bool,
}
pub(crate) struct ParseContext<'a> {
@@ -64,12 +62,3 @@ pub(crate) struct Encode<'a, T: 'a> {
title_case_headers: bool,
}
#[derive(Debug, PartialEq)]
pub enum Decode {
/// Decode normally.
Normal(Decoder),
/// After this decoder is done, HTTP is done.
Final(Decoder),
/// A header block that should be ignored, like unknown 1xx responses.
Ignore,
}

View File

@@ -8,25 +8,19 @@ use httparse;
use error::Parse;
use headers;
use proto::{BodyLength, MessageHead, RequestLine, RequestHead};
use proto::h1::{Decode, Decoder, Encode, Encoder, Http1Transaction, ParseResult, ParseContext, ParsedMessage, date};
use proto::{BodyLength, DecodedLength, MessageHead, RequestLine, RequestHead};
use proto::h1::{Encode, Encoder, Http1Transaction, ParseResult, ParseContext, ParsedMessage, date};
const MAX_HEADERS: usize = 100;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
// There are 2 main roles, Client and Server.
//
// There is 1 modifier, OnUpgrade, which can wrap Client and Server,
// to signal that HTTP upgrades are not supported.
pub(crate) struct Client<T>(T);
pub(crate) enum Client {}
pub(crate) struct Server<T>(T);
pub(crate) enum Server {}
impl<T> Http1Transaction for Server<T>
where
T: OnUpgrade,
{
impl Http1Transaction for Server {
type Incoming = RequestLine;
type Outgoing = StatusCode;
@@ -34,31 +28,45 @@ where
if buf.len() == 0 {
return Ok(None);
}
let mut keep_alive;
let is_http_11;
let subject;
let version;
let len;
let headers_len;
// Unsafe: both headers_indices and headers are using unitialized memory,
// but we *never* read any of it until after httparse has assigned
// values into it. By not zeroing out the stack memory, this saves
// a good ~5% on pipeline benchmarks.
let mut headers_indices: [HeaderIndices; MAX_HEADERS] = unsafe { mem::uninitialized() };
let (len, subject, version, headers_len) = {
{
let mut headers: [httparse::Header; MAX_HEADERS] = unsafe { mem::uninitialized() };
trace!("Request.parse([Header; {}], [u8; {}])", headers.len(), buf.len());
let mut req = httparse::Request::new(&mut headers);
let bytes = buf.as_ref();
match req.parse(bytes)? {
httparse::Status::Complete(len) => {
trace!("Request.parse Complete({})", len);
let method = Method::from_bytes(req.method.unwrap().as_bytes())?;
let path = req.path.unwrap().parse()?;
let subject = RequestLine(method, path);
let version = if req.version.unwrap() == 1 {
httparse::Status::Complete(parsed_len) => {
trace!("Request.parse Complete({})", parsed_len);
len = parsed_len;
subject = RequestLine(
Method::from_bytes(req.method.unwrap().as_bytes())?,
req.path.unwrap().parse()?
);
version = if req.version.unwrap() == 1 {
keep_alive = true;
is_http_11 = true;
Version::HTTP_11
} else {
keep_alive = false;
is_http_11 = false;
Version::HTTP_10
};
record_header_indices(bytes, &req.headers, &mut headers_indices);
let headers_len = req.headers.len();
(len, subject, version, headers_len)
headers_len = req.headers.len();
//(len, subject, version, headers_len)
}
httparse::Status::Partial => return Ok(None),
}
@@ -76,12 +84,12 @@ where
// 7. (irrelevant to Request)
let mut decoder = None;
let mut decoder = DecodedLength::ZERO;
let mut expect_continue = false;
let mut keep_alive = version == Version::HTTP_11;
let mut con_len = None;
let mut is_te = false;
let mut is_te_chunked = false;
let mut wants_upgrade = subject.0 == Method::CONNECT;
let mut headers = ctx.cached_headers
.take()
@@ -104,16 +112,14 @@ where
// If Transfer-Encoding header is present, and 'chunked' is
// not the final encoding, and this is a Request, then it is
// mal-formed. A server should respond with 400 Bad Request.
if version == Version::HTTP_10 {
if !is_http_11 {
debug!("HTTP/1.0 cannot have Transfer-Encoding header");
return Err(Parse::Header);
}
is_te = true;
if headers::is_chunked_(&value) {
is_te_chunked = true;
decoder = Some(Decoder::chunked());
//debug!("request with transfer-encoding header, but not chunked, bad request");
//return Err(Parse::Header);
decoder = DecodedLength::CHUNKED;
}
},
header::CONTENT_LENGTH => {
@@ -135,8 +141,8 @@ where
// we don't need to append this secondary length
continue;
}
decoder = DecodedLength::checked_new(len)?;
con_len = Some(len);
decoder = Some(Decoder::length(len));
},
header::CONNECTION => {
// keep_alive was previously set to default for Version
@@ -152,6 +158,10 @@ where
header::EXPECT => {
expect_continue = value.as_bytes() == b"100-continue";
},
header::UPGRADE => {
// Upgrades are only allowed with HTTP/1.1
wants_upgrade = is_http_11;
},
_ => (),
}
@@ -159,15 +169,10 @@ where
headers.append(name, value);
}
let decoder = if let Some(decoder) = decoder {
decoder
} else {
if is_te && !is_te_chunked {
debug!("request with transfer-encoding header, but not chunked, bad request");
return Err(Parse::Header);
}
Decoder::length(0)
};
if is_te && !is_te_chunked {
debug!("request with transfer-encoding header, but not chunked, bad request");
return Err(Parse::Header);
}
*ctx.req_method = Some(subject.0.clone());
@@ -177,9 +182,10 @@ where
subject,
headers,
},
decode: Decode::Normal(decoder),
decode: decoder,
expect_continue,
keep_alive,
wants_upgrade,
}))
}
@@ -194,7 +200,7 @@ where
let is_upgrade = msg.head.subject == StatusCode::SWITCHING_PROTOCOLS
|| (msg.req_method == &Some(Method::CONNECT) && msg.head.subject.is_success());
let (ret, mut is_last) = if is_upgrade {
(T::on_encode_upgrade(&mut msg), true)
(Ok(()), true)
} else if msg.head.subject.is_informational() {
error!("response with 1xx status code not supported");
*msg.head = MessageHead::default();
@@ -485,7 +491,7 @@ where
}
}
impl Server<()> {
impl Server {
fn can_have_body(method: &Option<Method>, status: StatusCode) -> bool {
Server::can_chunked(method, status)
}
@@ -508,65 +514,69 @@ impl Server<()> {
}
}
impl<T> Http1Transaction for Client<T>
where
T: OnUpgrade,
{
impl Http1Transaction for Client {
type Incoming = StatusCode;
type Outgoing = RequestLine;
fn parse(buf: &mut BytesMut, ctx: ParseContext) -> ParseResult<StatusCode> {
if buf.len() == 0 {
return Ok(None);
}
// Unsafe: see comment in Server Http1Transaction, above.
let mut headers_indices: [HeaderIndices; MAX_HEADERS] = unsafe { mem::uninitialized() };
let (len, status, version, headers_len) = {
let mut headers: [httparse::Header; MAX_HEADERS] = unsafe { mem::uninitialized() };
trace!("Response.parse([Header; {}], [u8; {}])", headers.len(), buf.len());
let mut res = httparse::Response::new(&mut headers);
let bytes = buf.as_ref();
match res.parse(bytes)? {
httparse::Status::Complete(len) => {
trace!("Response.parse Complete({})", len);
let status = StatusCode::from_u16(res.code.unwrap())?;
let version = if res.version.unwrap() == 1 {
Version::HTTP_11
} else {
Version::HTTP_10
};
record_header_indices(bytes, &res.headers, &mut headers_indices);
let headers_len = res.headers.len();
(len, status, version, headers_len)
},
httparse::Status::Partial => return Ok(None),
// Loop to skip information status code headers (100 Continue, etc).
loop {
if buf.len() == 0 {
return Ok(None);
}
};
// Unsafe: see comment in Server Http1Transaction, above.
let mut headers_indices: [HeaderIndices; MAX_HEADERS] = unsafe { mem::uninitialized() };
let (len, status, version, headers_len) = {
let mut headers: [httparse::Header; MAX_HEADERS] = unsafe { mem::uninitialized() };
trace!("Response.parse([Header; {}], [u8; {}])", headers.len(), buf.len());
let mut res = httparse::Response::new(&mut headers);
let bytes = buf.as_ref();
match res.parse(bytes)? {
httparse::Status::Complete(len) => {
trace!("Response.parse Complete({})", len);
let status = StatusCode::from_u16(res.code.unwrap())?;
let version = if res.version.unwrap() == 1 {
Version::HTTP_11
} else {
Version::HTTP_10
};
record_header_indices(bytes, &res.headers, &mut headers_indices);
let headers_len = res.headers.len();
(len, status, version, headers_len)
},
httparse::Status::Partial => return Ok(None),
}
};
let slice = buf.split_to(len).freeze();
let slice = buf.split_to(len).freeze();
let mut headers = ctx.cached_headers
.take()
.unwrap_or_else(HeaderMap::new);
let mut headers = ctx.cached_headers
.take()
.unwrap_or_else(HeaderMap::new);
headers.reserve(headers_len);
fill_headers(&mut headers, slice, &headers_indices[..headers_len]);
headers.reserve(headers_len);
fill_headers(&mut headers, slice, &headers_indices[..headers_len]);
let keep_alive = version == Version::HTTP_11;
let keep_alive = version == Version::HTTP_11;
let head = MessageHead {
version,
subject: status,
headers,
};
let decode = Client::<T>::decoder(&head, ctx.req_method)?;
let head = MessageHead {
version,
subject: status,
headers,
};
if let Some((decode, is_upgrade)) = Client::decoder(&head, ctx.req_method)? {
return Ok(Some(ParsedMessage {
head,
decode,
expect_continue: false,
// a client upgrade means the connection can't be used
// again, as it is definitely upgrading.
keep_alive: keep_alive && !is_upgrade,
wants_upgrade: is_upgrade,
}));
}
Ok(Some(ParsedMessage {
head,
decode,
expect_continue: false,
keep_alive,
}))
}
}
fn encode(msg: Encode<Self::Outgoing>, dst: &mut Vec<u8>) -> ::Result<Encoder> {
@@ -617,8 +627,11 @@ where
}
}
impl<T: OnUpgrade> Client<T> {
fn decoder(inc: &MessageHead<StatusCode>, method: &mut Option<Method>) -> Result<Decode, Parse> {
impl Client {
/// Returns Some(length, wants_upgrade) if successful.
///
/// Returns None if this message head should be skipped (like a 100 status).
fn decoder(inc: &MessageHead<StatusCode>, method: &mut Option<Method>) -> Result<Option<(DecodedLength, bool)>, Parse> {
// According to https://tools.ietf.org/html/rfc7230#section-3.3.3
// 1. HEAD responses, and Status 1xx, 204, and 304 cannot have a body.
// 2. Status 2xx to a CONNECT cannot have a body.
@@ -630,23 +643,23 @@ impl<T: OnUpgrade> Client<T> {
match inc.subject.as_u16() {
101 => {
return T::on_decode_upgrade().map(Decode::Final);
return Ok(Some((DecodedLength::ZERO, true)));
},
100...199 => {
trace!("ignoring informational response: {}", inc.subject.as_u16());
return Ok(Decode::Ignore);
return Ok(None);
},
204 |
304 => return Ok(Decode::Normal(Decoder::length(0))),
304 => return Ok(Some((DecodedLength::ZERO, false))),
_ => (),
}
match *method {
Some(Method::HEAD) => {
return Ok(Decode::Normal(Decoder::length(0)));
return Ok(Some((DecodedLength::ZERO, false)));
}
Some(Method::CONNECT) => match inc.subject.as_u16() {
200...299 => {
return Ok(Decode::Final(Decoder::length(0)));
return Ok(Some((DecodedLength::ZERO, true)));
},
_ => {},
},
@@ -665,24 +678,24 @@ impl<T: OnUpgrade> Client<T> {
debug!("HTTP/1.0 cannot have Transfer-Encoding header");
Err(Parse::Header)
} else if headers::transfer_encoding_is_chunked(&inc.headers) {
Ok(Decode::Normal(Decoder::chunked()))
Ok(Some((DecodedLength::CHUNKED, false)))
} else {
trace!("not chunked, read till eof");
Ok(Decode::Normal(Decoder::eof()))
Ok(Some((DecodedLength::CHUNKED, false)))
}
} else if let Some(len) = headers::content_length_parse_all(&inc.headers) {
Ok(Decode::Normal(Decoder::length(len)))
Ok(Some((DecodedLength::checked_new(len)?, false)))
} else if inc.headers.contains_key(header::CONTENT_LENGTH) {
debug!("illegal Content-Length header");
Err(Parse::Header)
} else {
trace!("neither Transfer-Encoding nor Content-Length");
Ok(Decode::Normal(Decoder::eof()))
Ok(Some((DecodedLength::CLOSE_DELIMITED, false)))
}
}
}
impl Client<()> {
impl Client {
fn set_length(head: &mut RequestHead, body: Option<BodyLength>) -> Encoder {
if let Some(body) = body {
let can_chunked = head.version == Version::HTTP_11
@@ -830,51 +843,6 @@ fn set_content_length(headers: &mut HeaderMap, len: u64) -> Encoder {
}
}
pub(crate) trait OnUpgrade {
fn on_encode_upgrade(msg: &mut Encode<StatusCode>) -> ::Result<()>;
fn on_decode_upgrade() -> Result<Decoder, Parse>;
}
pub(crate) enum YesUpgrades {}
pub(crate) enum NoUpgrades {}
impl OnUpgrade for YesUpgrades {
fn on_encode_upgrade(_: &mut Encode<StatusCode>) -> ::Result<()> {
Ok(())
}
fn on_decode_upgrade() -> Result<Decoder, Parse> {
debug!("101 response received, upgrading");
// 101 upgrades always have no body
Ok(Decoder::length(0))
}
}
impl OnUpgrade for NoUpgrades {
fn on_encode_upgrade(msg: &mut Encode<StatusCode>) -> ::Result<()> {
*msg.head = MessageHead::default();
msg.head.subject = ::StatusCode::INTERNAL_SERVER_ERROR;
msg.body = None;
if msg.head.subject == StatusCode::SWITCHING_PROTOCOLS {
error!("response with 101 status code not supported");
Err(Parse::UpgradeNotSupported.into())
} else if msg.req_method == &Some(Method::CONNECT) {
error!("200 response to CONNECT request not supported");
Err(::Error::new_user_unsupported_request_method())
} else {
debug_assert!(false, "upgrade incorrectly detected");
Err(::Error::new_status())
}
}
fn on_decode_upgrade() -> Result<Decoder, Parse> {
debug!("received 101 upgrade response, not supported");
Err(Parse::UpgradeNotSupported)
}
}
#[derive(Clone, Copy)]
struct HeaderIndices {
name: (usize, usize),
@@ -978,10 +946,6 @@ mod tests {
use bytes::BytesMut;
use super::*;
use super::{Server as S, Client as C};
type Server = S<NoUpgrades>;
type Client = C<NoUpgrades>;
#[test]
fn test_parse_request() {
@@ -1033,8 +997,6 @@ mod tests {
#[test]
fn test_decoder_request() {
use super::Decoder;
fn parse(s: &str) -> ParsedMessage<RequestLine> {
let mut bytes = BytesMut::from(s);
Server::parse(&mut bytes, ParseContext {
@@ -1058,39 +1020,39 @@ mod tests {
assert_eq!(parse("\
GET / HTTP/1.1\r\n\
\r\n\
").decode, Decode::Normal(Decoder::length(0)));
").decode, DecodedLength::ZERO);
assert_eq!(parse("\
POST / HTTP/1.1\r\n\
\r\n\
").decode, Decode::Normal(Decoder::length(0)));
").decode, DecodedLength::ZERO);
// transfer-encoding: chunked
assert_eq!(parse("\
POST / HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\
\r\n\
").decode, Decode::Normal(Decoder::chunked()));
").decode, DecodedLength::CHUNKED);
assert_eq!(parse("\
POST / HTTP/1.1\r\n\
transfer-encoding: gzip, chunked\r\n\
\r\n\
").decode, Decode::Normal(Decoder::chunked()));
").decode, DecodedLength::CHUNKED);
assert_eq!(parse("\
POST / HTTP/1.1\r\n\
transfer-encoding: gzip\r\n\
transfer-encoding: chunked\r\n\
\r\n\
").decode, Decode::Normal(Decoder::chunked()));
").decode, DecodedLength::CHUNKED);
// content-length
assert_eq!(parse("\
POST / HTTP/1.1\r\n\
content-length: 10\r\n\
\r\n\
").decode, Decode::Normal(Decoder::length(10)));
").decode, DecodedLength::new(10));
// transfer-encoding and content-length = chunked
assert_eq!(parse("\
@@ -1098,14 +1060,14 @@ mod tests {
content-length: 10\r\n\
transfer-encoding: chunked\r\n\
\r\n\
").decode, Decode::Normal(Decoder::chunked()));
").decode, DecodedLength::CHUNKED);
assert_eq!(parse("\
POST / HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\
content-length: 10\r\n\
\r\n\
").decode, Decode::Normal(Decoder::chunked()));
").decode, DecodedLength::CHUNKED);
assert_eq!(parse("\
POST / HTTP/1.1\r\n\
@@ -1113,7 +1075,7 @@ mod tests {
content-length: 10\r\n\
transfer-encoding: chunked\r\n\
\r\n\
").decode, Decode::Normal(Decoder::chunked()));
").decode, DecodedLength::CHUNKED);
// multiple content-lengths of same value are fine
@@ -1122,7 +1084,7 @@ mod tests {
content-length: 10\r\n\
content-length: 10\r\n\
\r\n\
").decode, Decode::Normal(Decoder::length(10)));
").decode, DecodedLength::new(10));
// multiple content-lengths with different values is an error
@@ -1153,7 +1115,7 @@ mod tests {
POST / HTTP/1.0\r\n\
content-length: 10\r\n\
\r\n\
").decode, Decode::Normal(Decoder::length(10)));
").decode, DecodedLength::new(10));
// 1.0 doesn't understand chunked, so its an error
@@ -1171,6 +1133,16 @@ mod tests {
parse_with_method(s, Method::GET)
}
fn parse_ignores(s: &str) {
let mut bytes = BytesMut::from(s);
assert!(Client::parse(&mut bytes, ParseContext {
cached_headers: &mut None,
req_method: &mut Some(Method::GET),
})
.expect("parse ok")
.is_none())
}
fn parse_with_method(s: &str, m: Method) -> ParsedMessage<StatusCode> {
let mut bytes = BytesMut::from(s);
Client::parse(&mut bytes, ParseContext {
@@ -1195,32 +1167,32 @@ mod tests {
assert_eq!(parse("\
HTTP/1.1 200 OK\r\n\
\r\n\
").decode, Decode::Normal(Decoder::eof()));
").decode, DecodedLength::CLOSE_DELIMITED);
// 204 and 304 never have a body
assert_eq!(parse("\
HTTP/1.1 204 No Content\r\n\
\r\n\
").decode, Decode::Normal(Decoder::length(0)));
").decode, DecodedLength::ZERO);
assert_eq!(parse("\
HTTP/1.1 304 Not Modified\r\n\
\r\n\
").decode, Decode::Normal(Decoder::length(0)));
").decode, DecodedLength::ZERO);
// content-length
assert_eq!(parse("\
HTTP/1.1 200 OK\r\n\
content-length: 8\r\n\
\r\n\
").decode, Decode::Normal(Decoder::length(8)));
").decode, DecodedLength::new(8));
assert_eq!(parse("\
HTTP/1.1 200 OK\r\n\
content-length: 8\r\n\
content-length: 8\r\n\
\r\n\
").decode, Decode::Normal(Decoder::length(8)));
").decode, DecodedLength::new(8));
parse_err("\
HTTP/1.1 200 OK\r\n\
@@ -1235,7 +1207,7 @@ mod tests {
HTTP/1.1 200 OK\r\n\
transfer-encoding: chunked\r\n\
\r\n\
").decode, Decode::Normal(Decoder::chunked()));
").decode, DecodedLength::CHUNKED);
// transfer-encoding and content-length = chunked
assert_eq!(parse("\
@@ -1243,7 +1215,7 @@ mod tests {
content-length: 10\r\n\
transfer-encoding: chunked\r\n\
\r\n\
").decode, Decode::Normal(Decoder::chunked()));
").decode, DecodedLength::CHUNKED);
// HEAD can have content-length, but not body
@@ -1251,44 +1223,54 @@ mod tests {
HTTP/1.1 200 OK\r\n\
content-length: 8\r\n\
\r\n\
", Method::HEAD).decode, Decode::Normal(Decoder::length(0)));
", Method::HEAD).decode, DecodedLength::ZERO);
// CONNECT with 200 never has body
assert_eq!(parse_with_method("\
HTTP/1.1 200 OK\r\n\
\r\n\
", Method::CONNECT).decode, Decode::Final(Decoder::length(0)));
{
let msg = parse_with_method("\
HTTP/1.1 200 OK\r\n\
\r\n\
", Method::CONNECT);
assert_eq!(msg.decode, DecodedLength::ZERO);
assert!(!msg.keep_alive, "should be upgrade");
assert!(msg.wants_upgrade, "should be upgrade");
}
// CONNECT receiving non 200 can have a body
assert_eq!(parse_with_method("\
HTTP/1.1 400 Bad Request\r\n\
\r\n\
", Method::CONNECT).decode, Decode::Normal(Decoder::eof()));
", Method::CONNECT).decode, DecodedLength::CLOSE_DELIMITED);
// 1xx status codes
assert_eq!(parse("\
parse_ignores("\
HTTP/1.1 100 Continue\r\n\
\r\n\
").decode, Decode::Ignore);
");
assert_eq!(parse("\
parse_ignores("\
HTTP/1.1 103 Early Hints\r\n\
\r\n\
").decode, Decode::Ignore);
");
// 101 upgrade not supported yet
parse_err("\
HTTP/1.1 101 Switching Protocols\r\n\
\r\n\
");
{
let msg = parse("\
HTTP/1.1 101 Switching Protocols\r\n\
\r\n\
");
assert_eq!(msg.decode, DecodedLength::ZERO);
assert!(!msg.keep_alive, "should be last");
assert!(msg.wants_upgrade, "should be upgrade");
}
// http/1.0
assert_eq!(parse("\
HTTP/1.0 200 OK\r\n\
\r\n\
").decode, Decode::Normal(Decoder::eof()));
").decode, DecodedLength::CLOSE_DELIMITED);
// 1.0 doesn't understand chunked
parse_err("\
@@ -1320,28 +1302,11 @@ mod tests {
}
#[test]
fn test_server_no_upgrades_connect_method() {
fn test_server_encode_connect_method() {
let mut head = MessageHead::default();
let mut vec = Vec::new();
let err = Server::encode(Encode {
head: &mut head,
body: None,
keep_alive: true,
req_method: &mut Some(Method::CONNECT),
title_case_headers: false,
}, &mut vec).unwrap_err();
assert!(err.is_user());
assert_eq!(err.kind(), &::error::Kind::UnsupportedRequestMethod);
}
#[test]
fn test_server_yes_upgrades_connect_method() {
let mut head = MessageHead::default();
let mut vec = Vec::new();
let encoder = S::<YesUpgrades>::encode(Encode {
let encoder = Server::encode(Encode {
head: &mut head,
body: None,
keep_alive: true,
@@ -1382,10 +1347,12 @@ mod tests {
b.bytes = len as u64;
b.iter(|| {
let msg = Server::parse(&mut raw, ParseContext {
let mut msg = Server::parse(&mut raw, ParseContext {
cached_headers: &mut headers,
req_method: &mut None,
}).unwrap().unwrap();
::test::black_box(&msg);
msg.head.headers.clear();
headers = Some(msg.head.headers);
restart(&mut raw, len);
});
@@ -1402,18 +1369,19 @@ mod tests {
#[cfg(feature = "nightly")]
#[bench]
fn bench_parse_short(b: &mut Bencher) {
let mut raw = BytesMut::from(
b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n".to_vec()
);
let s = &b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n"[..];
let mut raw = BytesMut::from(s.to_vec());
let len = raw.len();
let mut headers = Some(HeaderMap::new());
b.bytes = len as u64;
b.iter(|| {
let msg = Server::parse(&mut raw, ParseContext {
let mut msg = Server::parse(&mut raw, ParseContext {
cached_headers: &mut headers,
req_method: &mut None,
}).unwrap().unwrap();
::test::black_box(&msg);
msg.head.headers.clear();
headers = Some(msg.head.headers);
restart(&mut raw, len);
});
@@ -1480,3 +1448,4 @@ mod tests {
})
}
}

View File

@@ -8,6 +8,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use body::Payload;
use ::common::{Exec, Never};
use headers;
use ::proto::Dispatched;
use super::{PipeToSendStream, SendBuf};
use ::{Body, Request, Response};
@@ -16,7 +17,7 @@ type ClientRx<B> = ::client::dispatch::Receiver<Request<B>, Response<Body>>;
/// other handles to it have been dropped, so that it can shutdown.
type ConnDropRef = mpsc::Sender<Never>;
pub struct Client<T, B>
pub(crate) struct Client<T, B>
where
B: Payload,
{
@@ -54,7 +55,7 @@ where
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
{
type Item = ();
type Item = Dispatched;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@@ -153,7 +154,7 @@ where
Ok(Async::Ready(None)) |
Err(_) => {
trace!("client::dispatch::Sender dropped");
return Ok(Async::Ready(()));
return Ok(Async::Ready(Dispatched::Shutdown));
}
}
},

View File

@@ -7,6 +7,7 @@ use ::body::Payload;
use ::common::Exec;
use ::headers;
use ::service::Service;
use ::proto::Dispatched;
use super::{PipeToSendStream, SendBuf};
use ::{Body, Response};
@@ -82,7 +83,7 @@ where
S::Future: Send + 'static,
B: Payload,
{
type Item = ();
type Item = Dispatched;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@@ -95,12 +96,13 @@ where
})
},
State::Serving(ref mut srv) => {
return srv.poll_server(&mut self.service, &self.exec);
try_ready!(srv.poll_server(&mut self.service, &self.exec));
return Ok(Async::Ready(Dispatched::Shutdown));
}
State::Closed => {
// graceful_shutdown was called before handshaking finished,
// nothing to do here...
return Ok(Async::Ready(()));
return Ok(Async::Ready(Dispatched::Shutdown));
}
};
self.state = next;

View File

@@ -1,12 +1,12 @@
//! Pieces pertaining to the HTTP message protocol.
use http::{HeaderMap, Method, StatusCode, Uri, Version};
pub(crate) use self::h1::{dispatch, Conn, ClientTransaction, ClientUpgradeTransaction, ServerTransaction};
pub(crate) use self::h1::{dispatch, Conn, ServerTransaction};
use self::body_length::DecodedLength;
pub(crate) mod h1;
pub(crate) mod h2;
/// An Incoming Message head. Includes request/status line, and headers.
#[derive(Clone, Debug, Default, PartialEq)]
pub struct MessageHead<S> {
@@ -27,34 +27,6 @@ pub struct RequestLine(pub Method, pub Uri);
/// An incoming response message.
pub type ResponseHead = MessageHead<StatusCode>;
/*
impl<S> MessageHead<S> {
pub fn should_keep_alive(&self) -> bool {
should_keep_alive(self.version, &self.headers)
}
pub fn expecting_continue(&self) -> bool {
expecting_continue(self.version, &self.headers)
}
}
/// Checks if a connection should be kept alive.
#[inline]
pub fn should_keep_alive(version: Version, headers: &HeaderMap) -> bool {
if version == Version::HTTP_10 {
headers::connection_keep_alive(headers)
} else {
!headers::connection_close(headers)
}
}
/// Checks if a connection is expecting a `100 Continue` before sending its body.
#[inline]
pub fn expecting_continue(version: Version, headers: &HeaderMap) -> bool {
version == Version::HTTP_11 && headers::expect_continue(headers)
}
*/
#[derive(Debug)]
pub enum BodyLength {
/// Content-Length
@@ -63,32 +35,72 @@ pub enum BodyLength {
Unknown,
}
/*
#[test]
fn test_should_keep_alive() {
let mut headers = HeaderMap::new();
assert!(!should_keep_alive(Version::HTTP_10, &headers));
assert!(should_keep_alive(Version::HTTP_11, &headers));
headers.insert("connection", ::http::header::HeaderValue::from_static("close"));
assert!(!should_keep_alive(Version::HTTP_10, &headers));
assert!(!should_keep_alive(Version::HTTP_11, &headers));
headers.insert("connection", ::http::header::HeaderValue::from_static("keep-alive"));
assert!(should_keep_alive(Version::HTTP_10, &headers));
assert!(should_keep_alive(Version::HTTP_11, &headers));
/// Status of when an Disaptcher future completes.
pub(crate) enum Dispatched {
/// Dispatcher completely shutdown connection.
Shutdown,
/// Dispatcher has pending upgrade, and so did not shutdown.
Upgrade(::upgrade::Pending),
}
#[test]
fn test_expecting_continue() {
let mut headers = HeaderMap::new();
/// A separate module to encapsulate the invariants of the DecodedLength type.
mod body_length {
use std::fmt;
assert!(!expecting_continue(Version::HTTP_10, &headers));
assert!(!expecting_continue(Version::HTTP_11, &headers));
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) struct DecodedLength(u64);
headers.insert("expect", ::http::header::HeaderValue::from_static("100-continue"));
assert!(!expecting_continue(Version::HTTP_10, &headers));
assert!(expecting_continue(Version::HTTP_11, &headers));
const MAX_LEN: u64 = ::std::u64::MAX - 2;
impl DecodedLength {
pub(crate) const CLOSE_DELIMITED: DecodedLength = DecodedLength(::std::u64::MAX);
pub(crate) const CHUNKED: DecodedLength = DecodedLength(::std::u64::MAX - 1);
pub(crate) const ZERO: DecodedLength = DecodedLength(0);
#[cfg(test)]
pub(crate) fn new(len: u64) -> Self {
debug_assert!(len <= MAX_LEN);
DecodedLength(len)
}
/// Takes the length as a content-length without other checks.
///
/// Should only be called if previously confirmed this isn't
/// CLOSE_DELIMITED or CHUNKED.
#[inline]
pub(crate) fn danger_len(self) -> u64 {
debug_assert!(self.0 < Self::CHUNKED.0);
self.0
}
/// Converts to an Option<u64> representing a Known or Unknown length.
pub(crate) fn into_opt(self) -> Option<u64> {
match self {
DecodedLength::CHUNKED |
DecodedLength::CLOSE_DELIMITED => None,
DecodedLength(known) => Some(known)
}
}
/// Checks the `u64` is within the maximum allowed for content-length.
pub(crate) fn checked_new(len: u64) -> Result<Self, ::error::Parse> {
if len <= MAX_LEN {
Ok(DecodedLength(len))
} else {
warn!("content-length bigger than maximum: {} > {}", len, MAX_LEN);
Err(::error::Parse::TooLarge)
}
}
}
impl fmt::Display for DecodedLength {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
DecodedLength::CLOSE_DELIMITED => f.write_str("close-delimited"),
DecodedLength::CHUNKED => f.write_str("chunked encoding"),
DecodedLength::ZERO => f.write_str("empty"),
DecodedLength(n) => write!(f, "content-length ({} bytes)", n),
}
}
}
}
*/