style(lib): address most clippy lints

This commit is contained in:
danieleades
2020-01-03 17:40:32 +00:00
committed by Sean McArthur
parent 0f13719873
commit 0eaf304644
29 changed files with 162 additions and 200 deletions

View File

@@ -292,7 +292,7 @@ impl Opts {
} else { } else {
self.request_body self.request_body
.map(Body::from) .map(Body::from)
.unwrap_or_else(|| Body::empty()) .unwrap_or_else(Body::empty)
}; };
let mut req = Request::new(body); let mut req = Request::new(body);
*req.method_mut() = self.request_method.clone(); *req.method_mut() = self.request_method.clone();
@@ -355,5 +355,5 @@ fn spawn_server(rt: &mut tokio::runtime::Runtime, opts: &Opts) -> SocketAddr {
panic!("server error: {}", err); panic!("server error: {}", err);
} }
}); });
return addr; addr
} }

View File

@@ -105,7 +105,7 @@ fn throughput_fixedsize_large_payload(b: &mut test::Bencher) {
#[bench] #[bench]
fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) { fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("content-length", "1000000"), || { bench_server!(b, ("content-length", "1000000"), || {
static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _; static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s))) Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
}) })
} }
@@ -127,7 +127,7 @@ fn throughput_chunked_large_payload(b: &mut test::Bencher) {
#[bench] #[bench]
fn throughput_chunked_many_chunks(b: &mut test::Bencher) { fn throughput_chunked_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("transfer-encoding", "chunked"), || { bench_server!(b, ("transfer-encoding", "chunked"), || {
static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _; static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s))) Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
}) })
} }

View File

@@ -5,8 +5,8 @@ use futures_util::future::join;
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server}; use hyper::{Body, Request, Response, Server};
static INDEX1: &'static [u8] = b"The 1st service!"; static INDEX1: &[u8] = b"The 1st service!";
static INDEX2: &'static [u8] = b"The 2nd service!"; static INDEX2: &[u8] = b"The 2nd service!";
async fn index1(_: Request<Body>) -> Result<Response<Body>, hyper::Error> { async fn index1(_: Request<Body>) -> Result<Response<Body>, hyper::Error> {
Ok(Response::new(Body::from(INDEX1))) Ok(Response::new(Body::from(INDEX1)))

View File

@@ -71,5 +71,5 @@ async fn simple_file_send(filename: &str) -> Result<Response<Body>> {
return Ok(internal_server_error()); return Ok(internal_server_error());
} }
return Ok(not_found()); Ok(not_found())
} }

View File

@@ -6,7 +6,7 @@ use futures_util::future;
use hyper::service::Service; use hyper::service::Service;
use hyper::{Body, Request, Response, Server}; use hyper::{Body, Request, Response, Server};
const ROOT: &'static str = "/"; const ROOT: &str = "/";
#[derive(Debug)] #[derive(Debug)]
pub struct Svc; pub struct Svc;

View File

@@ -112,10 +112,7 @@ impl Body {
let (tx, rx) = mpsc::channel(0); let (tx, rx) = mpsc::channel(0);
let (abort_tx, abort_rx) = oneshot::channel(); let (abort_tx, abort_rx) = oneshot::channel();
let tx = Sender { let tx = Sender { abort_tx, tx };
abort_tx: abort_tx,
tx: tx,
};
let rx = Body::new(Kind::Chan { let rx = Body::new(Kind::Chan {
content_length, content_length,
abort_rx, abort_rx,
@@ -131,7 +128,6 @@ impl Body {
/// ///
/// ``` /// ```
/// # use hyper::Body; /// # use hyper::Body;
/// # fn main() {
/// let chunks: Vec<Result<_, ::std::io::Error>> = vec![ /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![
/// Ok("hello"), /// Ok("hello"),
/// Ok(" "), /// Ok(" "),
@@ -141,7 +137,6 @@ impl Body {
/// let stream = futures_util::stream::iter(chunks); /// let stream = futures_util::stream::iter(chunks);
/// ///
/// let body = Body::wrap_stream(stream); /// let body = Body::wrap_stream(stream);
/// # }
/// ``` /// ```
/// ///
/// # Optional /// # Optional
@@ -169,10 +164,7 @@ impl Body {
} }
fn new(kind: Kind) -> Body { fn new(kind: Kind) -> Body {
Body { Body { kind, extra: None }
kind: kind,
extra: None,
}
} }
pub(crate) fn h2(recv: h2::RecvStream, content_length: Option<u64>) -> Self { pub(crate) fn h2(recv: h2::RecvStream, content_length: Option<u64>) -> Self {
@@ -253,7 +245,7 @@ impl Body {
Some(chunk) => { Some(chunk) => {
if let Some(ref mut len) = *len { if let Some(ref mut len) = *len {
debug_assert!(*len >= chunk.len() as u64); debug_assert!(*len >= chunk.len() as u64);
*len = *len - chunk.len() as u64; *len -= chunk.len() as u64;
} }
Poll::Ready(Some(Ok(chunk))) Poll::Ready(Some(Ok(chunk)))
} }

View File

@@ -33,9 +33,8 @@ pub trait Payload: sealed::Sealed + Send + 'static {
/// Note: Trailers aren't currently used for HTTP/1, only for HTTP/2. /// Note: Trailers aren't currently used for HTTP/1, only for HTTP/2.
fn poll_trailers( fn poll_trailers(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut task::Context<'_>, _cx: &mut task::Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> { ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
drop(cx);
Poll::Ready(Ok(None)) Poll::Ready(Ok(None))
} }

View File

@@ -345,8 +345,8 @@ where
}; };
Parts { Parts {
io: io, io,
read_buf: read_buf, read_buf,
_inner: (), _inner: (),
} }
} }
@@ -363,9 +363,9 @@ where
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html) /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
/// to work with this function; or use the `without_shutdown` wrapper. /// to work with this function; or use the `without_shutdown` wrapper.
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
match self.inner.as_mut().expect("already upgraded") { match *self.inner.as_mut().expect("already upgraded") {
&mut ProtoClient::H1(ref mut h1) => h1.poll_without_shutdown(cx), ProtoClient::H1(ref mut h1) => h1.poll_without_shutdown(cx),
&mut ProtoClient::H2(ref mut h2) => Pin::new(h2).poll(cx).map_ok(|_| ()), ProtoClient::H2(ref mut h2) => Pin::new(h2).poll(cx).map_ok(|_| ()),
} }
} }

View File

@@ -542,7 +542,7 @@ impl ConnectingTcpRemote {
} }
} }
return Err(err.take().expect("missing connect error")); Err(err.take().expect("missing connect error"))
} }
} }
@@ -552,9 +552,9 @@ fn connect(
reuse_address: bool, reuse_address: bool,
connect_timeout: Option<Duration>, connect_timeout: Option<Duration>,
) -> io::Result<impl Future<Output = io::Result<TcpStream>>> { ) -> io::Result<impl Future<Output = io::Result<TcpStream>>> {
let builder = match addr { let builder = match *addr {
&SocketAddr::V4(_) => TcpBuilder::new_v4()?, SocketAddr::V4(_) => TcpBuilder::new_v4()?,
&SocketAddr::V6(_) => TcpBuilder::new_v6()?, SocketAddr::V6(_) => TcpBuilder::new_v6()?,
}; };
if reuse_address { if reuse_address {
@@ -566,9 +566,9 @@ fn connect(
builder.bind(SocketAddr::new(local_addr.clone(), 0))?; builder.bind(SocketAddr::new(local_addr.clone(), 0))?;
} else if cfg!(windows) { } else if cfg!(windows) {
// Windows requires a socket be bound before calling connect // Windows requires a socket be bound before calling connect
let any: SocketAddr = match addr { let any: SocketAddr = match *addr {
&SocketAddr::V4(_) => ([0, 0, 0, 0], 0).into(), SocketAddr::V4(_) => ([0, 0, 0, 0], 0).into(),
&SocketAddr::V6(_) => ([0, 0, 0, 0, 0, 0, 0, 0], 0).into(), SocketAddr::V6(_) => ([0, 0, 0, 0, 0, 0, 0, 0], 0).into(),
}; };
builder.bind(any)?; builder.bind(any)?;
} }
@@ -619,7 +619,7 @@ impl ConnectingTcp {
} }
}; };
if let Err(_) = result { if result.is_err() {
// Fallback to the remaining future (could be preferred or fallback) // Fallback to the remaining future (could be preferred or fallback)
// if we get an error // if we get an error
future.await future.await

View File

@@ -12,13 +12,10 @@ pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (giver, taker) = want::new(); let (giver, taker) = want::new();
let tx = Sender { let tx = Sender {
buffered_once: false, buffered_once: false,
giver: giver, giver,
inner: tx, inner: tx,
}; };
let rx = Receiver { let rx = Receiver { inner: rx, taker };
inner: rx,
taker: taker,
};
(tx, rx) (tx, rx)
} }
@@ -183,7 +180,7 @@ struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
impl<T, U> Drop for Envelope<T, U> { impl<T, U> Drop for Envelope<T, U> {
fn drop(&mut self) { fn drop(&mut self) {
if let Some((val, cb)) = self.0.take() { if let Some((val, cb)) = self.0.take() {
let _ = cb.send(Err(( cb.send(Err((
crate::Error::new_canceled().with("connection closed"), crate::Error::new_canceled().with("connection closed"),
Some(val), Some(val),
))); )));

View File

@@ -210,7 +210,7 @@ where
/// # fn main() {} /// # fn main() {}
/// ``` /// ```
pub fn request(&self, mut req: Request<B>) -> ResponseFuture { pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
let is_http_connect = req.method() == &Method::CONNECT; let is_http_connect = req.method() == Method::CONNECT;
match req.version() { match req.version() {
Version::HTTP_11 => (), Version::HTTP_11 => (),
Version::HTTP_10 => { Version::HTTP_10 => {
@@ -237,7 +237,7 @@ where
} }
}; };
let pool_key = Arc::new(domain.to_string()); let pool_key = Arc::new(domain);
ResponseFuture::new(Box::new(self.retryably_send_request(req, pool_key))) ResponseFuture::new(Box::new(self.retryably_send_request(req, pool_key)))
} }
@@ -302,14 +302,14 @@ where
} }
// CONNECT always sends authority-form, so check it first... // CONNECT always sends authority-form, so check it first...
if req.method() == &Method::CONNECT { if req.method() == Method::CONNECT {
authority_form(req.uri_mut()); authority_form(req.uri_mut());
} else if pooled.conn_info.is_proxied { } else if pooled.conn_info.is_proxied {
absolute_form(req.uri_mut()); absolute_form(req.uri_mut());
} else { } else {
origin_form(req.uri_mut()); origin_form(req.uri_mut());
}; };
} else if req.method() == &Method::CONNECT { } else if req.method() == Method::CONNECT {
debug!("client does not support CONNECT requests over HTTP2"); debug!("client does not support CONNECT requests over HTTP2");
return Either::Left(future::err(ClientError::Normal( return Either::Left(future::err(ClientError::Normal(
crate::Error::new_user_unsupported_request_method(), crate::Error::new_user_unsupported_request_method(),
@@ -422,7 +422,7 @@ where
}); });
// An execute error here isn't important, we're just trying // An execute error here isn't important, we're just trying
// to prevent a waste of a socket... // to prevent a waste of a socket...
let _ = executor.execute(bg); executor.execute(bg);
} }
Either::Left(future::ok(checked_out)) Either::Left(future::ok(checked_out))
} }

View File

@@ -352,7 +352,7 @@ impl<T: Poolable> PoolInner<T> {
Some(value) => { Some(value) => {
// borrow-check scope... // borrow-check scope...
{ {
let idle_list = self.idle.entry(key.clone()).or_insert(Vec::new()); let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new);
if self.max_idle_per_host <= idle_list.len() { if self.max_idle_per_host <= idle_list.len() {
trace!("max idle per host for {:?}, dropping connection", key); trace!("max idle per host for {:?}, dropping connection", key);
return; return;
@@ -360,7 +360,7 @@ impl<T: Poolable> PoolInner<T> {
debug!("pooling idle connection for {:?}", key); debug!("pooling idle connection for {:?}", key);
idle_list.push(Idle { idle_list.push(Idle {
value: value, value,
idle_at: Instant::now(), idle_at: Instant::now(),
}); });
} }
@@ -610,7 +610,7 @@ impl<T: Poolable> Checkout<T> {
inner inner
.waiters .waiters
.entry(self.key.clone()) .entry(self.key.clone())
.or_insert(VecDeque::new()) .or_insert_with(VecDeque::new)
.push_back(tx); .push_back(tx);
// register the waker with this oneshot // register the waker with this oneshot

View File

@@ -34,10 +34,7 @@ impl<T: Buf> Buf for BufList<T> {
#[inline] #[inline]
fn bytes(&self) -> &[u8] { fn bytes(&self) -> &[u8] {
for buf in &self.bufs { self.bufs.front().map(Buf::bytes).unwrap_or_default()
return buf.bytes();
}
&[]
} }
#[inline] #[inline]

View File

@@ -58,11 +58,11 @@ where
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
if let Some(mut prefix) = self.pre.take() { if let Some(mut prefix) = self.pre.take() {
// If there are no remaining bytes, let the bytes get dropped. // If there are no remaining bytes, let the bytes get dropped.
if prefix.len() > 0 { if !prefix.is_empty() {
let copy_len = cmp::min(prefix.len(), buf.len()); let copy_len = cmp::min(prefix.len(), buf.len());
prefix.copy_to_slice(&mut buf[..copy_len]); prefix.copy_to_slice(&mut buf[..copy_len]);
// Put back whats left // Put back whats left
if prefix.len() > 0 { if !prefix.is_empty() {
self.pre = Some(prefix); self.pre = Some(prefix);
} }

View File

@@ -49,9 +49,8 @@ where
type Output = R::Output; type Output = R::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.inner { if let Inner::Fut(ref mut f) = self.inner {
Inner::Fut(ref mut f) => return Pin::new(f).poll(cx), return Pin::new(f).poll(cx);
_ => (),
} }
match mem::replace(&mut self.inner, Inner::Empty) { match mem::replace(&mut self.inner, Inner::Empty) {

View File

@@ -96,7 +96,7 @@ pub fn is_chunked_(value: &HeaderValue) -> bool {
} }
pub fn add_chunked(mut entry: OccupiedEntry<'_, HeaderValue>) { pub fn add_chunked(mut entry: OccupiedEntry<'_, HeaderValue>) {
const CHUNKED: &'static str = "chunked"; const CHUNKED: &str = "chunked";
if let Some(line) = entry.iter_mut().next_back() { if let Some(line) = entry.iter_mut().next_back() {
// + 2 for ", " // + 2 for ", "

View File

@@ -13,7 +13,7 @@ use crate::common::{task, Pin, Poll, Unpin};
use crate::headers::connection_keep_alive; use crate::headers::connection_keep_alive;
use crate::proto::{BodyLength, DecodedLength, MessageHead}; use crate::proto::{BodyLength, DecodedLength, MessageHead};
const H2_PREFACE: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
/// This handles a connection, which will have been established over an /// This handles a connection, which will have been established over an
/// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple
@@ -391,9 +391,8 @@ where
pub fn can_write_head(&self) -> bool { pub fn can_write_head(&self) -> bool {
if !T::should_read_first() { if !T::should_read_first() {
match self.state.reading { if let Reading::Closed = self.state.reading {
Reading::Closed => return false, return false;
_ => {}
} }
} }
match self.state.writing { match self.state.writing {
@@ -486,7 +485,7 @@ where
let outgoing_is_keep_alive = head let outgoing_is_keep_alive = head
.headers .headers
.get(CONNECTION) .get(CONNECTION)
.and_then(|value| Some(connection_keep_alive(value))) .map(connection_keep_alive)
.unwrap_or(false); .unwrap_or(false);
if !outgoing_is_keep_alive { if !outgoing_is_keep_alive {
@@ -510,20 +509,16 @@ where
// If we know the remote speaks an older version, we try to fix up any messages // If we know the remote speaks an older version, we try to fix up any messages
// to work with our older peer. // to work with our older peer.
fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) { fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
match self.state.version { if let Version::HTTP_10 = self.state.version {
Version::HTTP_10 => { // Fixes response or connection when keep-alive header is not present
// Fixes response or connection when keep-alive header is not present self.fix_keep_alive(head);
self.fix_keep_alive(head); // If the remote only knows HTTP/1.0, we should force ourselves
// If the remote only knows HTTP/1.0, we should force ourselves // to do only speak HTTP/1.0 as well.
// to do only speak HTTP/1.0 as well. head.version = Version::HTTP_10;
head.version = Version::HTTP_10;
}
_ => {
// If the remote speaks HTTP/1.1, then it *should* be fine with
// both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
// the user's headers be.
}
} }
// If the remote speaks HTTP/1.1, then it *should* be fine with
// both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
// the user's headers be.
} }
pub fn write_body(&mut self, chunk: B) { pub fn write_body(&mut self, chunk: B) {
@@ -603,21 +598,18 @@ where
// - Client: there is nothing we can do // - Client: there is nothing we can do
// - Server: if Response hasn't been written yet, we can send a 4xx response // - Server: if Response hasn't been written yet, we can send a 4xx response
fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> { fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
match self.state.writing { if let Writing::Init = self.state.writing {
Writing::Init => { if self.has_h2_prefix() {
if self.has_h2_prefix() { return Err(crate::Error::new_version_h2());
return Err(crate::Error::new_version_h2()); }
} if let Some(msg) = T::on_error(&err) {
if let Some(msg) = T::on_error(&err) { // Drop the cached headers so as to not trigger a debug
// Drop the cached headers so as to not trigger a debug // assert in `write_head`...
// assert in `write_head`... self.state.cached_headers.take();
self.state.cached_headers.take(); self.write_head(msg, None);
self.write_head(msg, None); self.state.error = Some(err);
self.state.error = Some(err); return Ok(());
return Ok(());
}
} }
_ => (),
} }
// fallback is pass the error back up // fallback is pass the error back up

View File

@@ -62,8 +62,8 @@ where
{ {
pub fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self { pub fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
Dispatcher { Dispatcher {
conn: conn, conn,
dispatch: dispatch, dispatch,
body_tx: None, body_tx: None,
body_rx: Box::pin(None), body_rx: Box::pin(None),
is_closing: false, is_closing: false,
@@ -443,7 +443,7 @@ where
pub fn new(service: S) -> Server<S, B> { pub fn new(service: S) -> Server<S, B> {
Server { Server {
in_flight: Box::pin(None), in_flight: Box::pin(None),
service: service, service,
} }
} }
@@ -580,7 +580,7 @@ where
*res.status_mut() = msg.subject; *res.status_mut() = msg.subject;
*res.headers_mut() = msg.headers; *res.headers_mut() = msg.headers;
*res.version_mut() = msg.version; *res.version_mut() = msg.version;
let _ = cb.send(Ok(res)); cb.send(Ok(res));
Ok(()) Ok(())
} else { } else {
// Getting here is likely a bug! An error should have happened // Getting here is likely a bug! An error should have happened
@@ -591,7 +591,7 @@ where
} }
Err(err) => { Err(err) => {
if let Some(cb) = self.callback.take() { if let Some(cb) = self.callback.take() {
let _ = cb.send(Err((err, None))); cb.send(Err((err, None)));
Ok(()) Ok(())
} else if !self.rx_closed { } else if !self.rx_closed {
self.rx.close(); self.rx.close();
@@ -599,7 +599,7 @@ where
trace!("canceling queued request with connection error: {}", err); trace!("canceling queued request with connection error: {}", err);
// in this case, the message was never even started, so it's safe to tell // in this case, the message was never even started, so it's safe to tell
// the user that the request was completely canceled // the user that the request was completely canceled
let _ = cb.send(Err((crate::Error::new_canceled().with(err), Some(req)))); cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
Ok(()) Ok(())
} else { } else {
Err(err) Err(err)

View File

@@ -49,7 +49,7 @@ enum BufKind<B> {
impl Encoder { impl Encoder {
fn new(kind: Kind) -> Encoder { fn new(kind: Kind) -> Encoder {
Encoder { Encoder {
kind: kind, kind,
is_last: false, is_last: false,
} }
} }
@@ -307,7 +307,7 @@ impl fmt::Write for ChunkSize {
fn write_str(&mut self, num: &str) -> fmt::Result { fn write_str(&mut self, num: &str) -> fmt::Result {
use std::io::Write; use std::io::Write;
(&mut self.bytes[self.len.into()..]) (&mut self.bytes[self.len.into()..])
.write(num.as_bytes()) .write_all(num.as_bytes())
.expect("&mut [u8].write() cannot error"); .expect("&mut [u8].write() cannot error");
self.len += num.len() as u8; // safe because bytes is never bigger than 256 self.len += num.len() as u8; // safe because bytes is never bigger than 256
Ok(()) Ok(())

View File

@@ -57,7 +57,7 @@ where
pub fn new(io: T) -> Buffered<T, B> { pub fn new(io: T) -> Buffered<T, B> {
Buffered { Buffered {
flush_pipeline: false, flush_pipeline: false,
io: io, io,
read_blocked: false, read_blocked: false,
read_buf: BytesMut::with_capacity(0), read_buf: BytesMut::with_capacity(0),
read_buf_strategy: ReadStrategy::default(), read_buf_strategy: ReadStrategy::default(),
@@ -168,12 +168,9 @@ where
} }
} }
} }
match ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? { if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 {
0 => { trace!("parse eof");
trace!("parse eof"); return Poll::Ready(Err(crate::Error::new_incomplete()));
return Poll::Ready(Err(crate::Error::new_incomplete()));
}
_ => {}
} }
} }
} }
@@ -216,9 +213,8 @@ where
} else if self.write_buf.remaining() == 0 { } else if self.write_buf.remaining() == 0 {
Pin::new(&mut self.io).poll_flush(cx) Pin::new(&mut self.io).poll_flush(cx)
} else { } else {
match self.write_buf.strategy { if let WriteStrategy::Flatten = self.write_buf.strategy {
WriteStrategy::Flatten => return self.poll_flush_flattened(cx), return self.poll_flush_flattened(cx);
_ => (),
} }
loop { loop {
let n = let n =
@@ -325,35 +321,33 @@ impl ReadStrategy {
} }
fn record(&mut self, bytes_read: usize) { fn record(&mut self, bytes_read: usize) {
match *self { if let ReadStrategy::Adaptive {
ReadStrategy::Adaptive { ref mut decrease_now,
ref mut decrease_now, ref mut next,
ref mut next, max,
max, ..
.. } = *self
} => { {
if bytes_read >= *next { if bytes_read >= *next {
*next = cmp::min(incr_power_of_two(*next), max); *next = cmp::min(incr_power_of_two(*next), max);
*decrease_now = false; *decrease_now = false;
} else { } else {
let decr_to = prev_power_of_two(*next); let decr_to = prev_power_of_two(*next);
if bytes_read < decr_to { if bytes_read < decr_to {
if *decrease_now { if *decrease_now {
*next = cmp::max(decr_to, INIT_BUFFER_SIZE); *next = cmp::max(decr_to, INIT_BUFFER_SIZE);
*decrease_now = false;
} else {
// Decreasing is a two "record" process.
*decrease_now = true;
}
} else {
// A read within the current range should cancel
// a potential decrease, since we just saw proof
// that we still need this size.
*decrease_now = false; *decrease_now = false;
} else {
// Decreasing is a two "record" process.
*decrease_now = true;
} }
} else {
// A read within the current range should cancel
// a potential decrease, since we just saw proof
// that we still need this size.
*decrease_now = false;
} }
} }
_ => (),
} }
} }
} }
@@ -384,10 +378,7 @@ pub struct Cursor<T> {
impl<T: AsRef<[u8]>> Cursor<T> { impl<T: AsRef<[u8]>> Cursor<T> {
#[inline] #[inline]
pub(crate) fn new(bytes: T) -> Cursor<T> { pub(crate) fn new(bytes: T) -> Cursor<T> {
Cursor { Cursor { bytes, pos: 0 }
bytes: bytes,
pos: 0,
}
} }
} }
@@ -527,14 +518,15 @@ impl<B: Buf> Buf for WriteBuf<B> {
#[inline] #[inline]
fn advance(&mut self, cnt: usize) { fn advance(&mut self, cnt: usize) {
let hrem = self.headers.remaining(); let hrem = self.headers.remaining();
if hrem == cnt {
self.headers.reset(); match hrem.cmp(&cnt) {
} else if hrem > cnt { cmp::Ordering::Equal => self.headers.reset(),
self.headers.advance(cnt); cmp::Ordering::Greater => self.headers.advance(cnt),
} else { cmp::Ordering::Less => {
let qcnt = cnt - hrem; let qcnt = cnt - hrem;
self.headers.reset(); self.headers.reset();
self.queue.advance(qcnt); self.queue.advance(qcnt);
}
} }
} }
@@ -558,7 +550,7 @@ impl<'a, B: Buf> WriteBufAuto<'a, B> {
WriteBufAuto { WriteBufAuto {
bytes_called: Cell::new(false), bytes_called: Cell::new(false),
bytes_vec_called: Cell::new(false), bytes_vec_called: Cell::new(false),
inner: inner, inner,
} }
} }
} }

View File

@@ -472,10 +472,8 @@ impl Http1Transaction for Server {
continue 'headers; continue 'headers;
} }
header::CONNECTION => { header::CONNECTION => {
if !is_last { if !is_last && headers::connection_close(&value) {
if headers::connection_close(&value) { is_last = true;
is_last = true;
}
} }
if !is_name_written { if !is_name_written {
is_name_written = true; is_name_written = true;
@@ -594,9 +592,8 @@ impl Server {
} }
fn can_chunked(method: &Option<Method>, status: StatusCode) -> bool { fn can_chunked(method: &Option<Method>, status: StatusCode) -> bool {
if method == &Some(Method::HEAD) { if method == &Some(Method::HEAD) || method == &Some(Method::CONNECT) && status.is_success()
false {
} else if method == &Some(Method::CONNECT) && status.is_success() {
false false
} else { } else {
match status { match status {
@@ -766,7 +763,7 @@ impl Client {
101 => { 101 => {
return Ok(Some((DecodedLength::ZERO, true))); return Ok(Some((DecodedLength::ZERO, true)));
} }
100..=199 => { 100 | 102..=199 => {
trace!("ignoring informational response: {}", inc.subject.as_u16()); trace!("ignoring informational response: {}", inc.subject.as_u16());
return Ok(None); return Ok(None);
} }

View File

@@ -44,9 +44,10 @@ where
let (conn_drop_ref, rx) = mpsc::channel(1); let (conn_drop_ref, rx) = mpsc::channel(1);
let (cancel_tx, conn_eof) = oneshot::channel(); let (cancel_tx, conn_eof) = oneshot::channel();
let conn_drop_rx = rx.into_future().map(|(item, _rx)| match item { let conn_drop_rx = rx.into_future().map(|(item, _rx)| {
Some(never) => match never {}, if let Some(never) = item {
None => (), match never {}
}
}); });
let conn = conn.map_err(|e| debug!("connection error: {}", e)); let conn = conn.map_err(|e| debug!("connection error: {}", e));

View File

@@ -47,10 +47,8 @@ fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests"); warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
headers.remove(TE); headers.remove(TE);
} }
} else { } else if headers.remove(TE).is_some() {
if headers.remove(TE).is_some() { warn!("TE headers illegal in HTTP/2 responses");
warn!("TE headers illegal in HTTP/2 responses");
}
} }
if let Some(header) = headers.remove(CONNECTION) { if let Some(header) = headers.remove(CONNECTION) {
@@ -94,7 +92,7 @@ where
PipeToSendStream { PipeToSendStream {
body_tx: tx, body_tx: tx,
data_done: false, data_done: false,
stream: stream, stream,
} }
} }
} }
@@ -125,17 +123,15 @@ where
None => return Poll::Ready(Err(crate::Error::new_canceled())), None => return Poll::Ready(Err(crate::Error::new_canceled())),
} }
} }
} else { } else if let Poll::Ready(reason) = me
if let Poll::Ready(reason) = me .body_tx
.body_tx .poll_reset(cx)
.poll_reset(cx) .map_err(crate::Error::new_body_write)?
.map_err(crate::Error::new_body_write)? {
{ debug!("stream received RST_STREAM: {:?}", reason);
debug!("stream received RST_STREAM: {:?}", reason); return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(
return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from( reason,
reason, ))));
))));
}
} }
match ready!(me.stream.as_mut().poll_data(cx)) { match ready!(me.stream.as_mut().poll_data(cx)) {
@@ -172,7 +168,7 @@ where
if let Poll::Ready(reason) = me if let Poll::Ready(reason) = me
.body_tx .body_tx
.poll_reset(cx) .poll_reset(cx)
.map_err(|e| crate::Error::new_body_write(e))? .map_err(crate::Error::new_body_write)?
{ {
debug!("stream received RST_STREAM: {:?}", reason); debug!("stream received RST_STREAM: {:?}", reason);
return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from( return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(
@@ -238,6 +234,8 @@ impl<B: Buf> Buf for SendBuf<B> {
#[inline] #[inline]
fn advance(&mut self, cnt: usize) { fn advance(&mut self, cnt: usize) {
self.0.as_mut().map(|b| b.advance(cnt)); if let Some(b) = self.0.as_mut() {
b.advance(cnt)
}
} }
} }

View File

@@ -259,10 +259,8 @@ where
Poll::Pending => { Poll::Pending => {
// Response is not yet ready, so we want to check if the client has sent a // Response is not yet ready, so we want to check if the client has sent a
// RST_STREAM frame which would cancel the current request. // RST_STREAM frame which would cancel the current request.
if let Poll::Ready(reason) = me if let Poll::Ready(reason) =
.reply me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
.poll_reset(cx)
.map_err(|e| crate::Error::new_h2(e))?
{ {
debug!("stream received RST_STREAM: {:?}", reason); debug!("stream received RST_STREAM: {:?}", reason);
return Poll::Ready(Err(crate::Error::new_h2(reason.into()))); return Poll::Ready(Err(crate::Error::new_h2(reason.into())));

View File

@@ -489,8 +489,8 @@ where
ProtoServer::H1(h1) => { ProtoServer::H1(h1) => {
let (io, read_buf, dispatch) = h1.into_inner(); let (io, read_buf, dispatch) = h1.into_inner();
Some(Parts { Some(Parts {
io: io, io,
read_buf: read_buf, read_buf,
service: dispatch.into_service(), service: dispatch.into_service(),
_inner: (), _inner: (),
}) })
@@ -522,7 +522,7 @@ where
ProtoServer::H2(ref mut h2) => return Pin::new(h2).poll(cx).map_ok(|_| ()), ProtoServer::H2(ref mut h2) => return Pin::new(h2).poll(cx).map_ok(|_| ()),
}; };
match ready!(polled) { match ready!(polled) {
Ok(x) => return Poll::Ready(Ok(x)), Ok(()) => return Poll::Ready(Ok(())),
Err(e) => match *e.kind() { Err(e) => match *e.kind() {
Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => { Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
self.upgrade_h2(); self.upgrade_h2();

View File

@@ -35,7 +35,7 @@ impl AddrIncoming {
let addr = listener.local_addr().map_err(crate::Error::new_listen)?; let addr = listener.local_addr().map_err(crate::Error::new_listen)?;
Ok(AddrIncoming { Ok(AddrIncoming {
listener, listener,
addr: addr, addr,
sleep_on_errors: true, sleep_on_errors: true,
tcp_keepalive_timeout: None, tcp_keepalive_timeout: None,
tcp_nodelay: false, tcp_nodelay: false,

View File

@@ -310,7 +310,7 @@ macro_rules! __client_req_header {
} }
} }
static REPLY_OK: &'static str = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"; static REPLY_OK: &str = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
test! { test! {
name: client_get, name: client_get,
@@ -1771,7 +1771,7 @@ mod dispatch_impl {
// so the unwrapped responses futures show it still worked. // so the unwrapped responses futures show it still worked.
assert_eq!(connects.load(Ordering::SeqCst), 3); assert_eq!(connects.load(Ordering::SeqCst), 3);
let res4 = client.get(url.clone()); let res4 = client.get(url);
rt.block_on(res4).unwrap(); rt.block_on(res4).unwrap();
assert_eq!( assert_eq!(
@@ -1800,8 +1800,8 @@ mod dispatch_impl {
fn with_http_and_closes(http: HttpConnector, closes: mpsc::Sender<()>) -> DebugConnector { fn with_http_and_closes(http: HttpConnector, closes: mpsc::Sender<()>) -> DebugConnector {
DebugConnector { DebugConnector {
http: http, http,
closes: closes, closes,
connects: Arc::new(AtomicUsize::new(0)), connects: Arc::new(AtomicUsize::new(0)),
is_proxy: false, is_proxy: false,
alpn_h2: false, alpn_h2: false,
@@ -2242,7 +2242,7 @@ mod conn {
let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); let tcp = rt.block_on(tcp_connect(&addr)).unwrap();
let io = DebugStream { let io = DebugStream {
tcp: tcp, tcp,
shutdown_called: false, shutdown_called: false,
}; };
@@ -2330,7 +2330,7 @@ mod conn {
let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); let tcp = rt.block_on(tcp_connect(&addr)).unwrap();
let io = DebugStream { let io = DebugStream {
tcp: tcp, tcp,
shutdown_called: false, shutdown_called: false,
}; };

View File

@@ -1457,7 +1457,7 @@ async fn max_buf_size() {
thread::spawn(move || { thread::spawn(move || {
let mut tcp = connect(&addr); let mut tcp = connect(&addr);
tcp.write_all(b"POST /").expect("write 1"); tcp.write_all(b"POST /").expect("write 1");
tcp.write_all(&vec![b'a'; MAX]).expect("write 2"); tcp.write_all(&[b'a'; MAX]).expect("write 2");
let mut buf = [0; 256]; let mut buf = [0; 256];
tcp.read(&mut buf).expect("read 1"); tcp.read(&mut buf).expect("read 1");
@@ -1481,8 +1481,8 @@ fn streaming_body() {
// disable keep-alive so we can use read_to_end // disable keep-alive so we can use read_to_end
let server = serve_opts().keep_alive(false).serve(); let server = serve_opts().keep_alive(false).serve();
static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _; static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _;
let b = ::futures_util::stream::iter(S.into_iter()).map(|&s| Ok::<_, hyper::Error>(s)); let b = futures_util::stream::iter(S.iter()).map(|&s| Ok::<_, hyper::Error>(s));
let b = hyper::Body::wrap_stream(b); let b = hyper::Body::wrap_stream(b);
server.reply().body_stream(b); server.reply().body_stream(b);
@@ -1588,7 +1588,7 @@ fn http2_body_user_error_sends_reset_reason() {
let server = serve(); let server = serve();
let addr_str = format!("http://{}", server.addr()); let addr_str = format!("http://{}", server.addr());
let b = ::futures_util::stream::once(future::err::<String, _>(h2::Error::from( let b = futures_util::stream::once(future::err::<String, _>(h2::Error::from(
h2::Reason::INADEQUATE_SECURITY, h2::Reason::INADEQUATE_SECURITY,
))); )));
let b = hyper::Body::wrap_stream(b); let b = hyper::Body::wrap_stream(b);
@@ -1931,7 +1931,7 @@ impl TestService {
} }
} }
const HELLO: &'static str = "hello"; const HELLO: &str = "hello";
struct HelloWorld; struct HelloWorld;
@@ -2030,8 +2030,8 @@ impl ServeOptions {
let msg_tx = msg_tx.clone(); let msg_tx = msg_tx.clone();
let reply_rx = reply_rx.clone(); let reply_rx = reply_rx.clone();
future::ok::<_, BoxError>(TestService { future::ok::<_, BoxError>(TestService {
tx: msg_tx.clone(), tx: msg_tx,
reply: reply_rx.clone(), reply: reply_rx,
}) })
}); });
@@ -2056,9 +2056,9 @@ impl ServeOptions {
let addr = addr_rx.recv().expect("server addr rx"); let addr = addr_rx.recv().expect("server addr rx");
Serve { Serve {
msg_rx: msg_rx, msg_rx,
reply_tx: Mutex::new(reply_tx), reply_tx: Mutex::new(reply_tx),
addr: addr, addr,
shutdown_signal: Some(shutdown_tx), shutdown_signal: Some(shutdown_tx),
thread: Some(thread), thread: Some(thread),
} }

View File

@@ -375,7 +375,7 @@ async fn async_test(cfg: __TestConfig) {
let mut addr = server.local_addr(); let mut addr = server.local_addr();
tokio::task::spawn(server.map(|result| { tokio::task::spawn(server.map(|result| {
let _ = result.expect("server error"); result.expect("server error");
})); }));
if cfg.proxy { if cfg.proxy {
@@ -460,7 +460,7 @@ fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future<Output = ()>) {
let srv = Server::bind(&([127, 0, 0, 1], 0).into()).serve(make_service_fn(move |_| { let srv = Server::bind(&([127, 0, 0, 1], 0).into()).serve(make_service_fn(move |_| {
let prev = counter.fetch_add(1, Ordering::Relaxed); let prev = counter.fetch_add(1, Ordering::Relaxed);
assert!(max_connections >= prev + 1, "proxy max connections"); assert!(max_connections > prev, "proxy max connections");
let client = client.clone(); let client = client.clone();
future::ok::<_, hyper::Error>(service_fn(move |mut req| { future::ok::<_, hyper::Error>(service_fn(move |mut req| {
let uri = format!("http://{}{}", dst_addr, req.uri().path()) let uri = format!("http://{}{}", dst_addr, req.uri().path())