diff --git a/examples/server.rs b/examples/server.rs index 870e23c..86f7a3d 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -6,8 +6,8 @@ use bytes::*; use futures::*; use http::{Response, StatusCode}; -use tokio::net::{TcpListener, TcpStream}; use std::error::Error; +use tokio::net::{TcpListener, TcpStream}; #[tokio::main] pub async fn main() -> Result<(), Box> { @@ -21,7 +21,7 @@ pub async fn main() -> Result<(), Box> { while let Some(socket) = incoming.next().await { tokio::spawn(async move { if let Err(e) = handle(socket).await { - println!(" -> err={:?}", e); + println!(" -> err={:?}", e); } }); } @@ -45,6 +45,6 @@ async fn handle(socket: io::Result) -> Result<(), Box> { } println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~"); - + Ok(()) -} \ No newline at end of file +} diff --git a/src/client.rs b/src/client.rs index 70b84fe..c632c6d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -77,7 +77,7 @@ //! #[tokio::main] //! pub async fn main() -> Result<(), Box> { //! let addr = "127.0.0.1:5928".parse().unwrap(); -//! +//! //! // Establish TCP connection to the server. //! let tcp = TcpStream::connect(&addr).await?; //! let (h2, connection) = client::handshake(tcp).await?; @@ -96,7 +96,7 @@ //! // Send the request. The second tuple item allows the caller //! // to stream a request body. //! let (response, _) = h2.send_request(request, true).unwrap(); -//! +//! //! let (head, mut body) = response.await?.into_parts(); //! //! println!("Received response: {:?}", head); @@ -585,7 +585,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match &mut self.inner { Some(send_request) => { - let _ = ready!(send_request.poll_ready(cx))?; + ready!(send_request.poll_ready(cx))?; } None => panic!("called `poll` after future completed"), } @@ -1269,7 +1269,7 @@ impl Future for ResponseFuture { let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts(); let body = RecvStream::new(ReleaseCapacity::new(self.inner.clone())); - Poll::Ready(Ok(Response::from_parts(parts, body).into())) + Poll::Ready(Ok(Response::from_parts(parts, body))) } } diff --git a/src/codec/error.rs b/src/codec/error.rs index c6edffc..5155e0d 100644 --- a/src/codec/error.rs +++ b/src/codec/error.rs @@ -76,9 +76,7 @@ impl error::Error for RecvError { match *self { Connection(ref reason) => reason.description(), - Stream { - ref reason, .. - } => reason.description(), + Stream { ref reason, .. } => reason.description(), Io(ref e) => e.description(), } } diff --git a/src/codec/framed_read.rs b/src/codec/framed_read.rs index 59d08a8..88d1007 100644 --- a/src/codec/framed_read.rs +++ b/src/codec/framed_read.rs @@ -14,8 +14,8 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio_codec::{LengthDelimitedCodec, LengthDelimitedCodecError}; use tokio_codec::FramedRead as InnerFramedRead; +use tokio_codec::{LengthDelimitedCodec, LengthDelimitedCodecError}; use tokio_io::AsyncRead; // 16 MB "sane default" taken from golang http2 @@ -52,7 +52,7 @@ enum Continuable { impl FramedRead { pub fn new(inner: InnerFramedRead) -> FramedRead { FramedRead { - inner: inner, + inner, hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE), max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE, partial: None, diff --git a/src/frame/data.rs b/src/frame/data.rs index e2938a6..91de52d 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -1,5 +1,5 @@ -use bytes::{Buf, BufMut, Bytes}; use crate::frame::{util, Error, Frame, Head, Kind, StreamId}; +use bytes::{Buf, BufMut, Bytes}; use std::fmt; @@ -29,7 +29,7 @@ impl Data { assert!(!stream_id.is_zero()); Data { - stream_id: stream_id, + stream_id, data: payload, flags: DataFlags::default(), pad_len: None, @@ -135,8 +135,8 @@ impl Data { Ok(Data { stream_id: head.stream_id(), data: payload, - flags: flags, - pad_len: pad_len, + flags, + pad_len, }) } } diff --git a/src/frame/go_away.rs b/src/frame/go_away.rs index 95c1537..4dfdcd7 100644 --- a/src/frame/go_away.rs +++ b/src/frame/go_away.rs @@ -1,6 +1,6 @@ use crate::frame::{self, Error, Head, Kind, Reason, StreamId}; -use bytes::{BufMut}; +use bytes::BufMut; #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct GoAway { @@ -33,7 +33,7 @@ impl GoAway { let error_code = unpack_octets_4!(payload, 4, u32); Ok(GoAway { - last_stream_id: last_stream_id, + last_stream_id, error_code: error_code.into(), }) } diff --git a/src/frame/head.rs b/src/frame/head.rs index a72c5b3..c99306f 100644 --- a/src/frame/head.rs +++ b/src/frame/head.rs @@ -1,6 +1,6 @@ use super::StreamId; -use bytes::{BufMut}; +use bytes::BufMut; #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct Head { @@ -30,9 +30,9 @@ pub enum Kind { impl Head { pub fn new(kind: Kind, flag: u8, stream_id: StreamId) -> Head { Head { - kind: kind, - flag: flag, - stream_id: stream_id, + kind, + flag, + stream_id, } } diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 6f97875..92adffb 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -2,8 +2,8 @@ use super::{util, StreamDependency, StreamId}; use crate::frame::{Error, Frame, Head, Kind}; use crate::hpack; -use http::{uri, HeaderMap, Method, StatusCode, Uri}; use http::header::{self, HeaderName, HeaderValue}; +use http::{uri, HeaderMap, Method, StatusCode, Uri}; use bytes::{Bytes, BytesMut}; use string::String; @@ -118,12 +118,12 @@ impl Headers { /// Create a new HEADERS frame pub fn new(stream_id: StreamId, pseudo: Pseudo, fields: HeaderMap) -> Self { Headers { - stream_id: stream_id, + stream_id, stream_dep: None, header_block: HeaderBlock { - fields: fields, + fields, is_over_size: false, - pseudo: pseudo, + pseudo, }, flags: HeadersFlag::default(), } @@ -137,11 +137,11 @@ impl Headers { stream_id, stream_dep: None, header_block: HeaderBlock { - fields: fields, + fields, is_over_size: false, pseudo: Pseudo::default(), }, - flags: flags, + flags, } } @@ -156,7 +156,7 @@ impl Headers { // Read the padding length if flags.is_padded() { - if src.len() < 1 { + if src.is_empty() { return Err(Error::MalformedMessage); } pad = src[0] as usize; @@ -195,19 +195,24 @@ impl Headers { let headers = Headers { stream_id: head.stream_id(), - stream_dep: stream_dep, + stream_dep, header_block: HeaderBlock { fields: HeaderMap::new(), is_over_size: false, pseudo: Pseudo::default(), }, - flags: flags, + flags, }; Ok((headers, src)) } - pub fn load_hpack(&mut self, src: &mut BytesMut, max_header_list_size: usize, decoder: &mut hpack::Decoder) -> Result<(), Error> { + pub fn load_hpack( + &mut self, + src: &mut BytesMut, + max_header_list_size: usize, + decoder: &mut hpack::Decoder, + ) -> Result<(), Error> { self.header_block.load(src, max_header_list_size, decoder) } @@ -263,9 +268,9 @@ impl Headers { // Get the HEADERS frame head let head = self.head(); - self.header_block.into_encoding() - .encode(&head, encoder, dst, |_| { - }) + self.header_block + .into_encoding() + .encode(&head, encoder, dst, |_| {}) } fn head(&self) -> Head { @@ -307,7 +312,7 @@ impl PushPromise { // Read the padding length if flags.is_padded() { - if src.len() < 1 { + if src.is_empty() { return Err(Error::MalformedMessage); } @@ -336,19 +341,24 @@ impl PushPromise { } let frame = PushPromise { - flags: flags, + flags, header_block: HeaderBlock { fields: HeaderMap::new(), is_over_size: false, pseudo: Pseudo::default(), }, - promised_id: promised_id, + promised_id, stream_id: head.stream_id(), }; Ok((frame, src)) } - pub fn load_hpack(&mut self, src: &mut BytesMut, max_header_list_size: usize, decoder: &mut hpack::Decoder) -> Result<(), Error> { + pub fn load_hpack( + &mut self, + src: &mut BytesMut, + max_header_list_size: usize, + decoder: &mut hpack::Decoder, + ) -> Result<(), Error> { self.header_block.load(src, max_header_list_size, decoder) } @@ -381,7 +391,8 @@ impl PushPromise { let head = self.head(); let promised_id = self.promised_id; - self.header_block.into_encoding() + self.header_block + .into_encoding() .encode(&head, encoder, dst, |dst| { dst.put_u32_be(promised_id.into()); }) @@ -456,9 +467,7 @@ impl Continuation { // Get the CONTINUATION frame head let head = self.head(); - self.header_block - .encode(&head, encoder, dst, |_| { - }) + self.header_block.encode(&head, encoder, dst, |_| {}) } } @@ -471,7 +480,7 @@ impl Pseudo { let mut path = parts .path_and_query .map(|v| v.into()) - .unwrap_or_else(|| Bytes::new()); + .unwrap_or_else(Bytes::new); if path.is_empty() && method != Method::OPTIONS { path = Bytes::from_static(b"/"); @@ -527,13 +536,15 @@ fn to_string(src: Bytes) -> String { // ===== impl EncodingHeaderBlock ===== impl EncodingHeaderBlock { - fn encode(mut self, - head: &Head, - encoder: &mut hpack::Encoder, - dst: &mut BytesMut, - f: F) - -> Option - where F: FnOnce(&mut BytesMut), + fn encode( + mut self, + head: &Head, + encoder: &mut hpack::Encoder, + dst: &mut BytesMut, + f: F, + ) -> Option + where + F: FnOnce(&mut BytesMut), { let head_pos = dst.len(); @@ -610,12 +621,9 @@ impl Iterator for Iter { self.pseudo = None; - self.fields.next().map(|(name, value)| { - Field { - name: name, - value: value, - } - }) + self.fields + .next() + .map(|(name, value)| Field { name, value }) } } @@ -727,9 +735,13 @@ impl fmt::Debug for PushPromiseFlag { // ===== HeaderBlock ===== - impl HeaderBlock { - fn load(&mut self, src: &mut BytesMut, max_header_list_size: usize, decoder: &mut hpack::Decoder) -> Result<(), Error> { + fn load( + &mut self, + src: &mut BytesMut, + max_header_list_size: usize, + decoder: &mut hpack::Decoder, + ) -> Result<(), Error> { let mut reg = !self.fields.is_empty(); let mut malformed = false; let mut headers_size = self.calculate_header_list_size(); @@ -744,7 +756,8 @@ impl HeaderBlock { malformed = true; } else { let __val = $val; - headers_size += decoded_header_size(stringify!($ident).len() + 1, __val.as_str().len()); + headers_size += + decoded_header_size(stringify!($ident).len() + 1, __val.as_str().len()); if headers_size < max_header_list_size { self.pseudo.$field = Some(__val); } else if !self.is_over_size { @@ -752,7 +765,7 @@ impl HeaderBlock { self.is_over_size = true; } } - }} + }}; } let mut cursor = Cursor::new(src); @@ -765,10 +778,7 @@ impl HeaderBlock { use crate::hpack::Header::*; match header { - Field { - name, - value, - } => { + Field { name, value } => { // Connection level header fields are not supported and must // result in a protocol error. @@ -794,7 +804,7 @@ impl HeaderBlock { self.is_over_size = true; } } - }, + } Authority(v) => set_pseudo!(authority, v), Method(v) => set_pseudo!(method, v), Scheme(v) => set_pseudo!(scheme, v), @@ -810,7 +820,7 @@ impl HeaderBlock { if malformed { log::trace!("malformed message"); - return Err(Error::MalformedMessage.into()); + return Err(Error::MalformedMessage); } Ok(()) @@ -835,36 +845,38 @@ impl HeaderBlock { /// > overhead of 32 octets for each header field. fn calculate_header_list_size(&self) -> usize { macro_rules! pseudo_size { - ($name:ident) => ({ + ($name:ident) => {{ self.pseudo .$name .as_ref() .map(|m| decoded_header_size(stringify!($name).len() + 1, m.as_str().len())) .unwrap_or(0) - }); + }}; } - pseudo_size!(method) + - pseudo_size!(scheme) + - pseudo_size!(status) + - pseudo_size!(authority) + - pseudo_size!(path) + - self.fields.iter() - .map(|(name, value)| decoded_header_size(name.as_str().len(), value.len())) - .sum::() + pseudo_size!(method) + + pseudo_size!(scheme) + + pseudo_size!(status) + + pseudo_size!(authority) + + pseudo_size!(path) + + self + .fields + .iter() + .map(|(name, value)| decoded_header_size(name.as_str().len(), value.len())) + .sum::() } /// Iterate over all pseudos and headers to see if any individual pair /// would be too large to encode. pub(crate) fn has_too_big_field(&self) -> bool { macro_rules! pseudo_size { - ($name:ident) => ({ + ($name:ident) => {{ self.pseudo .$name .as_ref() .map(|m| decoded_header_size(stringify!($name).len() + 1, m.as_str().len())) .unwrap_or(0) - }); + }}; } if pseudo_size!(method) > MAX_HEADER_LENGTH { diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 82335b9..ee371ee 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -18,12 +18,12 @@ use std::fmt; #[macro_escape] macro_rules! unpack_octets_4 { // TODO: Get rid of this macro - ($buf:expr, $offset:expr, $tip:ty) => ( - (($buf[$offset + 0] as $tip) << 24) | - (($buf[$offset + 1] as $tip) << 16) | - (($buf[$offset + 2] as $tip) << 8) | - (($buf[$offset + 3] as $tip) << 0) - ); + ($buf:expr, $offset:expr, $tip:ty) => { + (($buf[$offset + 0] as $tip) << 24) + | (($buf[$offset + 1] as $tip) << 16) + | (($buf[$offset + 2] as $tip) << 8) + | (($buf[$offset + 3] as $tip) << 0) + }; } mod data; @@ -54,11 +54,8 @@ pub use self::window_update::WindowUpdate; // Re-export some constants pub use self::settings::{ - DEFAULT_INITIAL_WINDOW_SIZE, - DEFAULT_MAX_FRAME_SIZE, - DEFAULT_SETTINGS_HEADER_TABLE_SIZE, - MAX_INITIAL_WINDOW_SIZE, - MAX_MAX_FRAME_SIZE, + DEFAULT_INITIAL_WINDOW_SIZE, DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, + MAX_INITIAL_WINDOW_SIZE, MAX_MAX_FRAME_SIZE, }; pub type FrameSize = u32; diff --git a/src/frame/ping.rs b/src/frame/ping.rs index 0bf38b3..df96080 100644 --- a/src/frame/ping.rs +++ b/src/frame/ping.rs @@ -1,5 +1,5 @@ -use bytes::{Buf, BufMut, IntoBuf}; use crate::frame::{Error, Frame, Head, Kind, StreamId}; +use bytes::{Buf, BufMut, IntoBuf}; const ACK_FLAG: u8 = 0x1; @@ -17,7 +17,6 @@ const SHUTDOWN_PAYLOAD: Payload = [0x0b, 0x7b, 0xa2, 0xf0, 0x8b, 0x9b, 0xfe, 0x5 const USER_PAYLOAD: Payload = [0x3b, 0x7c, 0xdb, 0x7a, 0x0b, 0x87, 0x16, 0xb4]; impl Ping { - #[cfg(feature = "unstable")] pub const SHUTDOWN: Payload = SHUTDOWN_PAYLOAD; @@ -38,10 +37,7 @@ impl Ping { } pub fn pong(payload: Payload) -> Ping { - Ping { - ack: true, - payload, - } + Ping { ack: true, payload } } pub fn is_ack(&self) -> bool { @@ -84,10 +80,7 @@ impl Ping { // endpoint MUST NOT respond to PING frames containing this flag. let ack = head.flag() & ACK_FLAG != 0; - Ok(Ping { - ack, - payload, - }) + Ok(Ping { ack, payload }) } pub fn encode(&self, dst: &mut B) { diff --git a/src/frame/priority.rs b/src/frame/priority.rs index 2ca08ac..d7d47db 100644 --- a/src/frame/priority.rs +++ b/src/frame/priority.rs @@ -30,7 +30,7 @@ impl Priority { Ok(Priority { stream_id: head.stream_id(), - dependency: dependency, + dependency, }) } } diff --git a/src/frame/reason.rs b/src/frame/reason.rs index a33b988..031b6cd 100644 --- a/src/frame/reason.rs +++ b/src/frame/reason.rs @@ -1,6 +1,5 @@ use std::fmt; - /// HTTP/2.0 error codes. /// /// Error codes are used in `RST_STREAM` and `GOAWAY` frames to convey the @@ -76,7 +75,7 @@ impl Reason { 10 => { "connection established in response to a CONNECT request was reset or abnormally \ closed" - }, + } 11 => "detected excessive load generating behavior", 12 => "security properties do not meet minimum requirements", 13 => "endpoint requires HTTP/1.1", @@ -114,9 +113,7 @@ impl fmt::Debug for Reason { 11 => "ENHANCE_YOUR_CALM", 12 => "INADEQUATE_SECURITY", 13 => "HTTP_1_1_REQUIRED", - other => return f.debug_tuple("Reason") - .field(&Hex(other)) - .finish(), + other => return f.debug_tuple("Reason").field(&Hex(other)).finish(), }; f.write_str(name) } diff --git a/src/frame/reset.rs b/src/frame/reset.rs index 888961c..e58294e 100644 --- a/src/frame/reset.rs +++ b/src/frame/reset.rs @@ -1,6 +1,6 @@ use crate::frame::{self, Error, Head, Kind, Reason, StreamId}; -use bytes::{BufMut}; +use bytes::BufMut; #[derive(Debug, Eq, PartialEq)] pub struct Reset { diff --git a/src/frame/settings.rs b/src/frame/settings.rs index f6bd9bc..248c095 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -1,7 +1,7 @@ use std::fmt; -use bytes::{BufMut, BytesMut}; use crate::frame::{util, Error, Frame, FrameSize, Head, Kind, StreamId}; +use bytes::{BufMut, BytesMut}; #[derive(Clone, Default, Eq, PartialEq)] pub struct Settings { @@ -121,7 +121,7 @@ impl Settings { if flag.is_ack() { // Ensure that the payload is empty - if payload.len() > 0 { + if !payload.is_empty() { return Err(Error::InvalidPayloadLength); } @@ -142,34 +142,36 @@ impl Settings { match Setting::load(raw) { Some(HeaderTableSize(val)) => { settings.header_table_size = Some(val); - }, + } Some(EnablePush(val)) => match val { 0 | 1 => { settings.enable_push = Some(val); - }, + } _ => { return Err(Error::InvalidSettingValue); - }, + } }, Some(MaxConcurrentStreams(val)) => { settings.max_concurrent_streams = Some(val); - }, - Some(InitialWindowSize(val)) => if val as usize > MAX_INITIAL_WINDOW_SIZE { - return Err(Error::InvalidSettingValue); - } else { - settings.initial_window_size = Some(val); - }, + } + Some(InitialWindowSize(val)) => { + if val as usize > MAX_INITIAL_WINDOW_SIZE { + return Err(Error::InvalidSettingValue); + } else { + settings.initial_window_size = Some(val); + } + } Some(MaxFrameSize(val)) => { if val < DEFAULT_MAX_FRAME_SIZE || val > MAX_MAX_FRAME_SIZE { return Err(Error::InvalidSettingValue); } else { settings.max_frame_size = Some(val); } - }, + } Some(MaxHeaderListSize(val)) => { settings.max_header_list_size = Some(val); - }, - None => {}, + } + None => {} } } @@ -294,7 +296,7 @@ impl Setting { /// /// If given a buffer shorter than 6 bytes, the function will panic. fn load(raw: &[u8]) -> Option { - let id: u16 = ((raw[0] as u16) << 8) | (raw[1] as u16); + let id: u16 = (u16::from(raw[0]) << 8) | u16::from(raw[1]); let val: u32 = unpack_octets_4!(raw, 2, u32); Setting::from_id(id, val) diff --git a/src/frame/util.rs b/src/frame/util.rs index a66bebe..6bee7bd 100644 --- a/src/frame/util.rs +++ b/src/frame/util.rs @@ -38,7 +38,10 @@ pub fn strip_padding(payload: &mut Bytes) -> Result { Ok(pad_len as u8) } -pub(super) fn debug_flags<'a, 'f: 'a>(fmt: &'a mut fmt::Formatter<'f>, bits: u8) -> DebugFlags<'a, 'f> { +pub(super) fn debug_flags<'a, 'f: 'a>( + fmt: &'a mut fmt::Formatter<'f>, + bits: u8, +) -> DebugFlags<'a, 'f> { let result = write!(fmt, "({:#x}", bits); DebugFlags { fmt, @@ -71,8 +74,6 @@ impl<'a, 'f: 'a> DebugFlags<'a, 'f> { } pub(super) fn finish(&mut self) -> fmt::Result { - self.result.and_then(|()| { - write!(self.fmt, ")") - }) + self.result.and_then(|()| write!(self.fmt, ")")) } } diff --git a/src/frame/window_update.rs b/src/frame/window_update.rs index 1877519..73e4a20 100644 --- a/src/frame/window_update.rs +++ b/src/frame/window_update.rs @@ -1,6 +1,6 @@ use crate::frame::{self, Error, Head, Kind, StreamId}; -use bytes::{BufMut}; +use bytes::BufMut; const SIZE_INCREMENT_MASK: u32 = 1 << 31; @@ -38,7 +38,7 @@ impl WindowUpdate { let size_increment = unpack_octets_4!(payload, 0, u32) & !SIZE_INCREMENT_MASK; if size_increment == 0 { - return Err(Error::InvalidWindowUpdateValue.into()); + return Err(Error::InvalidWindowUpdateValue); } Ok(WindowUpdate { diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index 24f7167..be0152f 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -168,7 +168,11 @@ impl Decoder { } /// Decodes the headers found in the given buffer. - pub fn decode(&mut self, src: &mut Cursor<&mut BytesMut>, mut f: F) -> Result<(), DecoderError> + pub fn decode( + &mut self, + src: &mut Cursor<&mut BytesMut>, + mut f: F, + ) -> Result<(), DecoderError> where F: FnMut(Header), { @@ -193,7 +197,7 @@ impl Decoder { let entry = self.decode_indexed(src)?; consume(src); f(entry); - }, + } LiteralWithIndexing => { log::trace!(" LiteralWithIndexing; rem={:?}", src.remaining()); can_resize = false; @@ -204,14 +208,14 @@ impl Decoder { consume(src); f(entry); - }, + } LiteralWithoutIndexing => { log::trace!(" LiteralWithoutIndexing; rem={:?}", src.remaining()); can_resize = false; let entry = self.decode_literal(src, false)?; consume(src); f(entry); - }, + } LiteralNeverIndexed => { log::trace!(" LiteralNeverIndexed; rem={:?}", src.remaining()); can_resize = false; @@ -221,7 +225,7 @@ impl Decoder { // TODO: Track that this should never be indexed f(entry); - }, + } SizeUpdate => { log::trace!(" SizeUpdate; rem={:?}", src.remaining()); if !can_resize { @@ -231,7 +235,7 @@ impl Decoder { // Handle the dynamic table size update self.process_size_update(src)?; consume(src); - }, + } } } @@ -287,7 +291,7 @@ impl Decoder { } fn decode_string(&mut self, buf: &mut Cursor<&mut BytesMut>) -> Result { - const HUFF_FLAG: u8 = 0b10000000; + const HUFF_FLAG: u8 = 0b1000_0000; // The first bit in the first byte contains the huffman encoded flag. let huff = match peek_u8(buf) { @@ -331,12 +335,12 @@ impl Default for Decoder { impl Representation { pub fn load(byte: u8) -> Result { - const INDEXED: u8 = 0b10000000; - const LITERAL_WITH_INDEXING: u8 = 0b01000000; - const LITERAL_WITHOUT_INDEXING: u8 = 0b11110000; - const LITERAL_NEVER_INDEXED: u8 = 0b00010000; - const SIZE_UPDATE_MASK: u8 = 0b11100000; - const SIZE_UPDATE: u8 = 0b00100000; + const INDEXED: u8 = 0b1000_0000; + const LITERAL_WITH_INDEXING: u8 = 0b0100_0000; + const LITERAL_WITHOUT_INDEXING: u8 = 0b1111_0000; + const LITERAL_NEVER_INDEXED: u8 = 0b0001_0000; + const SIZE_UPDATE_MASK: u8 = 0b1110_0000; + const SIZE_UPDATE: u8 = 0b0010_0000; // TODO: What did I even write here? @@ -361,8 +365,8 @@ fn decode_int(buf: &mut B, prefix_size: u8) -> Result 8 { return Err(DecoderError::InvalidIntegerPrefix); @@ -445,7 +449,7 @@ impl Table { Table { entries: VecDeque::new(), size: 0, - max_size: max_size, + max_size, } } @@ -516,7 +520,7 @@ impl Table { // Can never happen as the size of the table must reach // 0 by the time we've exhausted all elements. panic!("Size of table != 0, but no headers left!"); - }, + } }; self.size -= last.len(); @@ -827,15 +831,20 @@ mod test { let mut buf = buf.into(); let mut res = vec![]; - let _ = de.decode(&mut Cursor::new(&mut buf), |h| { - res.push(h); - }).unwrap(); + let _ = de + .decode(&mut Cursor::new(&mut buf), |h| { + res.push(h); + }) + .unwrap(); assert_eq!(res.len(), 1); assert_eq!(de.table.size(), 0); match res[0] { - Header::Field { ref name, ref value } => { + Header::Field { + ref name, + ref value, + } => { assert_eq!(name, "foo"); assert_eq!(value, "bar"); } diff --git a/src/hpack/encoder.rs b/src/hpack/encoder.rs index 53e3cec..74aae3f 100644 --- a/src/hpack/encoder.rs +++ b/src/hpack/encoder.rs @@ -1,5 +1,5 @@ -use super::{huffman, Header}; use super::table::{Index, Table}; +use super::{huffman, Header}; use bytes::{BufMut, BytesMut}; use http::header::{HeaderName, HeaderValue}; @@ -47,27 +47,31 @@ impl Encoder { #[allow(dead_code)] pub fn update_max_size(&mut self, val: usize) { match self.size_update { - Some(SizeUpdate::One(old)) => if val > old { - if old > self.table.max_size() { + Some(SizeUpdate::One(old)) => { + if val > old { + if old > self.table.max_size() { + self.size_update = Some(SizeUpdate::One(val)); + } else { + self.size_update = Some(SizeUpdate::Two(old, val)); + } + } else { + self.size_update = Some(SizeUpdate::One(val)); + } + } + Some(SizeUpdate::Two(min, _)) => { + if val < min { self.size_update = Some(SizeUpdate::One(val)); } else { - self.size_update = Some(SizeUpdate::Two(old, val)); + self.size_update = Some(SizeUpdate::Two(min, val)); } - } else { - self.size_update = Some(SizeUpdate::One(val)); - }, - Some(SizeUpdate::Two(min, _)) => if val < min { - self.size_update = Some(SizeUpdate::One(val)); - } else { - self.size_update = Some(SizeUpdate::Two(min, val)); - }, + } None => { if val != self.table.max_size() { // Don't bother writing a frame if the value already matches // the table's max size. self.size_update = Some(SizeUpdate::One(val)); } - }, + } } } @@ -120,14 +124,11 @@ impl Encoder { if res.is_err() { dst.truncate(len); - return Encode::Partial(EncodeState { - index: index, - value: None, - }); + return Encode::Partial(EncodeState { index, value: None }); } last_index = Some(index); - }, + } // The header does not have an associated name. This means that // the name is the same as the previously yielded header. In // which case, we skip table lookup and just use the same index @@ -148,7 +149,7 @@ impl Encoder { value: Some(value), }); } - }, + } }; } @@ -160,14 +161,14 @@ impl Encoder { Some(SizeUpdate::One(val)) => { self.table.resize(val); encode_size_update(val, dst)?; - }, + } Some(SizeUpdate::Two(min, max)) => { self.table.resize(min); self.table.resize(max); encode_size_update(min, dst)?; encode_size_update(max, dst)?; - }, - None => {}, + } + None => {} } Ok(()) @@ -177,12 +178,12 @@ impl Encoder { match *index { Index::Indexed(idx, _) => { encode_int(idx, 7, 0x80, dst)?; - }, + } Index::Name(idx, _) => { let header = self.table.resolve(&index); encode_not_indexed(idx, header.value_slice(), header.is_sensitive(), dst)?; - }, + } Index::Inserted(_) => { let header = self.table.resolve(&index); @@ -192,19 +193,19 @@ impl Encoder { return Err(EncoderError::BufferOverflow); } - dst.put_u8(0b01000000); + dst.put_u8(0b0100_0000); encode_str(header.name().as_slice(), dst)?; encode_str(header.value_slice(), dst)?; - }, + } Index::InsertedValue(idx, _) => { let header = self.table.resolve(&index); assert!(!header.is_sensitive()); - encode_int(idx, 6, 0b01000000, dst)?; + encode_int(idx, 6, 0b0100_0000, dst)?; encode_str(header.value_slice(), dst)?; - }, + } Index::NotIndexed(_) => { let header = self.table.resolve(&index); @@ -214,7 +215,7 @@ impl Encoder { header.is_sensitive(), dst, )?; - }, + } } Ok(()) @@ -227,14 +228,14 @@ impl Encoder { dst: &mut BytesMut, ) -> Result<(), EncoderError> { match *last { - Index::Indexed(..) | - Index::Name(..) | - Index::Inserted(..) | - Index::InsertedValue(..) => { + Index::Indexed(..) + | Index::Name(..) + | Index::Inserted(..) + | Index::InsertedValue(..) => { let idx = self.table.resolve_idx(last); encode_not_indexed(idx, value.as_ref(), value.is_sensitive(), dst)?; - }, + } Index::NotIndexed(_) => { let last = self.table.resolve(last); @@ -244,7 +245,7 @@ impl Encoder { value.is_sensitive(), dst, )?; - }, + } } Ok(()) @@ -258,7 +259,7 @@ impl Default for Encoder { } fn encode_size_update(val: usize, dst: &mut B) -> Result<(), EncoderError> { - encode_int(val, 5, 0b00100000, dst) + encode_int(val, 5, 0b0010_0000, dst) } fn encode_not_indexed( @@ -305,7 +306,7 @@ fn encode_str(val: &[u8], dst: &mut BytesMut) -> Result<(), EncoderError> { return Err(EncoderError::BufferOverflow); } - if val.len() != 0 { + if !val.is_empty() { let idx = dst.len(); // Push a placeholder byte for the length header @@ -378,7 +379,7 @@ fn encode_int( value -= low; - if value > 0x0fffffff { + if value > 0x0fff_ffff { panic!("value out of range"); } @@ -390,10 +391,10 @@ fn encode_int( return Err(EncoderError::BufferOverflow); } - dst.put_u8(0b10000000 | value as u8); + dst.put_u8(0b1000_0000 | value as u8); rem -= 1; - value = value >> 7; + value >>= 7; } if rem == 0 { @@ -560,7 +561,7 @@ mod test { let header = Header::Field { name: Some(name), - value: value, + value, }; // Now, try to encode the sensitive header @@ -580,7 +581,7 @@ mod test { let header = Header::Field { name: Some(name), - value: value, + value, }; let mut encoder = Encoder::default(); @@ -604,7 +605,7 @@ mod test { let header = Header::Field { name: Some(name), - value: value, + value, }; let res = encode(&mut encoder, vec![header]); @@ -808,7 +809,8 @@ mod test { name: None, value: HeaderValue::from_bytes(b"sup").unwrap(), }, - ].into_iter(); + ] + .into_iter(); let resume = match encoder.encode(None, &mut input, &mut dst) { Encode::Partial(r) => r, @@ -823,7 +825,7 @@ mod test { dst.clear(); match encoder.encode(Some(resume), &mut input, &mut dst) { - Encode::Full => {}, + Encode::Full => {} unexpected => panic!("resume returned unexpected: {:?}", unexpected), } @@ -856,7 +858,7 @@ mod test { Header::Field { name: Some(name), - value: value, + value, } } diff --git a/src/hpack/header.rs b/src/hpack/header.rs index 9034a03..c2181d6 100644 --- a/src/hpack/header.rs +++ b/src/hpack/header.rs @@ -1,8 +1,8 @@ use super::{DecoderError, NeedMore}; use bytes::Bytes; -use http::{Method, StatusCode}; use http::header::{HeaderName, HeaderValue}; +use http::{Method, StatusCode}; use string::{String, TryFrom}; /// HTTP/2.0 Header @@ -41,14 +41,8 @@ impl Header> { Field { name: Some(n), value, - } => Field { - name: n, - value: value, - }, - Field { - name: None, - value, - } => return Err(value), + } => Field { name: n, value }, + Field { name: None, value } => return Err(value), Authority(v) => Authority(v), Method(v) => Method(v), Scheme(v) => Scheme(v), @@ -60,7 +54,7 @@ impl Header> { impl Header { pub fn new(name: Bytes, value: Bytes) -> Result { - if name.len() == 0 { + if name.is_empty() { return Err(DecoderError::NeedMore(NeedMore::UnexpectedEndOfStream)); } if name[0] == b':' { @@ -68,23 +62,23 @@ impl Header { b"authority" => { let value = String::try_from(value)?; Ok(Header::Authority(value)) - }, + } b"method" => { let method = Method::from_bytes(&value)?; Ok(Header::Method(method)) - }, + } b"scheme" => { let value = String::try_from(value)?; Ok(Header::Scheme(value)) - }, + } b"path" => { let value = String::try_from(value)?; Ok(Header::Path(value)) - }, + } b"status" => { let status = StatusCode::from_bytes(&value)?; Ok(Header::Status(status)) - }, + } _ => Err(DecoderError::InvalidPseudoheader), } } else { @@ -92,10 +86,7 @@ impl Header { let name = HeaderName::from_lowercase(&name)?; let value = HeaderValue::from_bytes(&value)?; - Ok(Header::Field { - name: name, - value: value, - }) + Ok(Header::Field { name, value }) } } @@ -116,9 +107,7 @@ impl Header { /// Returns the header name pub fn name(&self) -> Name { match *self { - Header::Field { - ref name, .. - } => Name::Field(name), + Header::Field { ref name, .. } => Name::Field(name), Header::Authority(..) => Name::Authority, Header::Method(..) => Name::Method, Header::Scheme(..) => Name::Scheme, @@ -129,9 +118,7 @@ impl Header { pub fn value_slice(&self) -> &[u8] { match *self { - Header::Field { - ref value, .. - } => value.as_ref(), + Header::Field { ref value, .. } => value.as_ref(), Header::Authority(ref v) => v.as_ref(), Header::Method(ref v) => v.as_ref().as_ref(), Header::Scheme(ref v) => v.as_ref(), @@ -142,17 +129,13 @@ impl Header { pub fn value_eq(&self, other: &Header) -> bool { match *self { - Header::Field { - ref value, .. - } => { + Header::Field { ref value, .. } => { let a = value; match *other { - Header::Field { - ref value, .. - } => a == value, + Header::Field { ref value, .. } => a == value, _ => false, } - }, + } Header::Authority(ref a) => match *other { Header::Authority(ref b) => a == b, _ => false, @@ -178,9 +161,7 @@ impl Header { pub fn is_sensitive(&self) -> bool { match *self { - Header::Field { - ref value, .. - } => value.is_sensitive(), + Header::Field { ref value, .. } => value.is_sensitive(), // TODO: Technically these other header values can be sensitive too. _ => false, } @@ -190,18 +171,16 @@ impl Header { use http::header; match *self { - Header::Field { - ref name, .. - } => match *name { - header::AGE | - header::AUTHORIZATION | - header::CONTENT_LENGTH | - header::ETAG | - header::IF_MODIFIED_SINCE | - header::IF_NONE_MATCH | - header::LOCATION | - header::COOKIE | - header::SET_COOKIE => true, + Header::Field { ref name, .. } => match *name { + header::AGE + | header::AUTHORIZATION + | header::CONTENT_LENGTH + | header::ETAG + | header::IF_MODIFIED_SINCE + | header::IF_NONE_MATCH + | header::LOCATION + | header::COOKIE + | header::SET_COOKIE => true, _ => false, }, Header::Path(..) => true, @@ -214,10 +193,7 @@ impl Header { impl From
for Header> { fn from(src: Header) -> Self { match src { - Header::Field { - name, - value, - } => Header::Field { + Header::Field { name, value } => Header::Field { name: Some(name), value, }, @@ -247,7 +223,7 @@ impl<'a> Name<'a> { // TODO: better error handling Err(_) => Err(DecoderError::InvalidStatusCode), } - }, + } } } diff --git a/src/hpack/huffman/table.rs b/src/hpack/huffman/table.rs index 8fe1a7a..560cfaf 100644 --- a/src/hpack/huffman/table.rs +++ b/src/hpack/huffman/table.rs @@ -3,37 +3,37 @@ // (num-bits, bits) pub const ENCODE_TABLE: [(usize, u64); 257] = [ (13, 0x1ff8), - (23, 0x7fffd8), - (28, 0xfffffe2), - (28, 0xfffffe3), - (28, 0xfffffe4), - (28, 0xfffffe5), - (28, 0xfffffe6), - (28, 0xfffffe7), - (28, 0xfffffe8), - (24, 0xffffea), - (30, 0x3ffffffc), - (28, 0xfffffe9), - (28, 0xfffffea), - (30, 0x3ffffffd), - (28, 0xfffffeb), - (28, 0xfffffec), - (28, 0xfffffed), - (28, 0xfffffee), - (28, 0xfffffef), - (28, 0xffffff0), - (28, 0xffffff1), - (28, 0xffffff2), - (30, 0x3ffffffe), - (28, 0xffffff3), - (28, 0xffffff4), - (28, 0xffffff5), - (28, 0xffffff6), - (28, 0xffffff7), - (28, 0xffffff8), - (28, 0xffffff9), - (28, 0xffffffa), - (28, 0xffffffb), + (23, 0x007f_ffd8), + (28, 0x0fff_ffe2), + (28, 0x0fff_ffe3), + (28, 0x0fff_ffe4), + (28, 0x0fff_ffe5), + (28, 0x0fff_ffe6), + (28, 0x0fff_ffe7), + (28, 0x0fff_ffe8), + (24, 0x00ff_ffea), + (30, 0x3fff_fffc), + (28, 0x0fff_ffe9), + (28, 0x0fff_ffea), + (30, 0x3fff_fffd), + (28, 0x0fff_ffeb), + (28, 0x0fff_ffec), + (28, 0x0fff_ffed), + (28, 0x0fff_ffee), + (28, 0x0fff_ffef), + (28, 0x0fff_fff0), + (28, 0x0fff_fff1), + (28, 0x0fff_fff2), + (30, 0x3fff_fffe), + (28, 0x0fff_fff3), + (28, 0x0fff_fff4), + (28, 0x0fff_fff5), + (28, 0x0fff_fff6), + (28, 0x0fff_fff7), + (28, 0x0fff_fff8), + (28, 0x0fff_fff9), + (28, 0x0fff_fffa), + (28, 0x0fff_fffb), (6, 0x14), (10, 0x3f8), (10, 0x3f9), @@ -129,136 +129,136 @@ pub const ENCODE_TABLE: [(usize, u64); 257] = [ (11, 0x7fc), (14, 0x3ffd), (13, 0x1ffd), - (28, 0xffffffc), + (28, 0x0fff_fffc), (20, 0xfffe6), - (22, 0x3fffd2), + (22, 0x003f_ffd2), (20, 0xfffe7), (20, 0xfffe8), - (22, 0x3fffd3), - (22, 0x3fffd4), - (22, 0x3fffd5), - (23, 0x7fffd9), - (22, 0x3fffd6), - (23, 0x7fffda), - (23, 0x7fffdb), - (23, 0x7fffdc), - (23, 0x7fffdd), - (23, 0x7fffde), - (24, 0xffffeb), - (23, 0x7fffdf), - (24, 0xffffec), - (24, 0xffffed), - (22, 0x3fffd7), - (23, 0x7fffe0), - (24, 0xffffee), - (23, 0x7fffe1), - (23, 0x7fffe2), - (23, 0x7fffe3), - (23, 0x7fffe4), - (21, 0x1fffdc), - (22, 0x3fffd8), - (23, 0x7fffe5), - (22, 0x3fffd9), - (23, 0x7fffe6), - (23, 0x7fffe7), - (24, 0xffffef), - (22, 0x3fffda), - (21, 0x1fffdd), + (22, 0x003f_ffd3), + (22, 0x003f_ffd4), + (22, 0x003f_ffd5), + (23, 0x007f_ffd9), + (22, 0x003f_ffd6), + (23, 0x007f_ffda), + (23, 0x007f_ffdb), + (23, 0x007f_ffdc), + (23, 0x007f_ffdd), + (23, 0x007f_ffde), + (24, 0x00ff_ffeb), + (23, 0x007f_ffdf), + (24, 0x00ff_ffec), + (24, 0x00ff_ffed), + (22, 0x003f_ffd7), + (23, 0x007f_ffe0), + (24, 0x00ff_ffee), + (23, 0x007f_ffe1), + (23, 0x007f_ffe2), + (23, 0x007f_ffe3), + (23, 0x007f_ffe4), + (21, 0x001f_ffdc), + (22, 0x003f_ffd8), + (23, 0x007f_ffe5), + (22, 0x003f_ffd9), + (23, 0x007f_ffe6), + (23, 0x007f_ffe7), + (24, 0x00ff_ffef), + (22, 0x003f_ffda), + (21, 0x001f_ffdd), (20, 0xfffe9), - (22, 0x3fffdb), - (22, 0x3fffdc), - (23, 0x7fffe8), - (23, 0x7fffe9), - (21, 0x1fffde), - (23, 0x7fffea), - (22, 0x3fffdd), - (22, 0x3fffde), - (24, 0xfffff0), - (21, 0x1fffdf), - (22, 0x3fffdf), - (23, 0x7fffeb), - (23, 0x7fffec), - (21, 0x1fffe0), - (21, 0x1fffe1), - (22, 0x3fffe0), - (21, 0x1fffe2), - (23, 0x7fffed), - (22, 0x3fffe1), - (23, 0x7fffee), - (23, 0x7fffef), + (22, 0x003f_ffdb), + (22, 0x003f_ffdc), + (23, 0x007f_ffe8), + (23, 0x007f_ffe9), + (21, 0x001f_ffde), + (23, 0x007f_ffea), + (22, 0x003f_ffdd), + (22, 0x003f_ffde), + (24, 0x00ff_fff0), + (21, 0x001f_ffdf), + (22, 0x003f_ffdf), + (23, 0x007f_ffeb), + (23, 0x007f_ffec), + (21, 0x001f_ffe0), + (21, 0x001f_ffe1), + (22, 0x003f_ffe0), + (21, 0x001f_ffe2), + (23, 0x007f_ffed), + (22, 0x003f_ffe1), + (23, 0x007f_ffee), + (23, 0x007f_ffef), (20, 0xfffea), - (22, 0x3fffe2), - (22, 0x3fffe3), - (22, 0x3fffe4), - (23, 0x7ffff0), - (22, 0x3fffe5), - (22, 0x3fffe6), - (23, 0x7ffff1), - (26, 0x3ffffe0), - (26, 0x3ffffe1), + (22, 0x003f_ffe2), + (22, 0x003f_ffe3), + (22, 0x003f_ffe4), + (23, 0x007f_fff0), + (22, 0x003f_ffe5), + (22, 0x003f_ffe6), + (23, 0x007f_fff1), + (26, 0x03ff_ffe0), + (26, 0x03ff_ffe1), (20, 0xfffeb), (19, 0x7fff1), - (22, 0x3fffe7), - (23, 0x7ffff2), - (22, 0x3fffe8), - (25, 0x1ffffec), - (26, 0x3ffffe2), - (26, 0x3ffffe3), - (26, 0x3ffffe4), - (27, 0x7ffffde), - (27, 0x7ffffdf), - (26, 0x3ffffe5), - (24, 0xfffff1), - (25, 0x1ffffed), + (22, 0x003f_ffe7), + (23, 0x007f_fff2), + (22, 0x003f_ffe8), + (25, 0x01ff_ffec), + (26, 0x03ff_ffe2), + (26, 0x03ff_ffe3), + (26, 0x03ff_ffe4), + (27, 0x07ff_ffde), + (27, 0x07ff_ffdf), + (26, 0x03ff_ffe5), + (24, 0x00ff_fff1), + (25, 0x01ff_ffed), (19, 0x7fff2), - (21, 0x1fffe3), - (26, 0x3ffffe6), - (27, 0x7ffffe0), - (27, 0x7ffffe1), - (26, 0x3ffffe7), - (27, 0x7ffffe2), - (24, 0xfffff2), - (21, 0x1fffe4), - (21, 0x1fffe5), - (26, 0x3ffffe8), - (26, 0x3ffffe9), - (28, 0xffffffd), - (27, 0x7ffffe3), - (27, 0x7ffffe4), - (27, 0x7ffffe5), + (21, 0x001f_ffe3), + (26, 0x03ff_ffe6), + (27, 0x07ff_ffe0), + (27, 0x07ff_ffe1), + (26, 0x03ff_ffe7), + (27, 0x07ff_ffe2), + (24, 0x00ff_fff2), + (21, 0x001f_ffe4), + (21, 0x001f_ffe5), + (26, 0x03ff_ffe8), + (26, 0x03ff_ffe9), + (28, 0x0fff_fffd), + (27, 0x07ff_ffe3), + (27, 0x07ff_ffe4), + (27, 0x07ff_ffe5), (20, 0xfffec), - (24, 0xfffff3), + (24, 0x00ff_fff3), (20, 0xfffed), - (21, 0x1fffe6), - (22, 0x3fffe9), - (21, 0x1fffe7), - (21, 0x1fffe8), - (23, 0x7ffff3), - (22, 0x3fffea), - (22, 0x3fffeb), - (25, 0x1ffffee), - (25, 0x1ffffef), - (24, 0xfffff4), - (24, 0xfffff5), - (26, 0x3ffffea), - (23, 0x7ffff4), - (26, 0x3ffffeb), - (27, 0x7ffffe6), - (26, 0x3ffffec), - (26, 0x3ffffed), - (27, 0x7ffffe7), - (27, 0x7ffffe8), - (27, 0x7ffffe9), - (27, 0x7ffffea), - (27, 0x7ffffeb), - (28, 0xffffffe), - (27, 0x7ffffec), - (27, 0x7ffffed), - (27, 0x7ffffee), - (27, 0x7ffffef), - (27, 0x7fffff0), - (26, 0x3ffffee), - (30, 0x3fffffff), + (21, 0x001f_ffe6), + (22, 0x003f_ffe9), + (21, 0x001f_ffe7), + (21, 0x001f_ffe8), + (23, 0x007f_fff3), + (22, 0x003f_ffea), + (22, 0x003f_ffeb), + (25, 0x01ff_ffee), + (25, 0x01ff_ffef), + (24, 0x00ff_fff4), + (24, 0x00ff_fff5), + (26, 0x03ff_ffea), + (23, 0x007f_fff4), + (26, 0x03ff_ffeb), + (27, 0x07ff_ffe6), + (26, 0x03ff_ffec), + (26, 0x03ff_ffed), + (27, 0x07ff_ffe7), + (27, 0x07ff_ffe8), + (27, 0x07ff_ffe9), + (27, 0x07ff_ffea), + (27, 0x07ff_ffeb), + (28, 0x0fff_fffe), + (27, 0x07ff_ffec), + (27, 0x07ff_ffed), + (27, 0x07ff_ffee), + (27, 0x07ff_ffef), + (27, 0x07ff_fff0), + (26, 0x03ff_ffee), + (30, 0x3fff_ffff), ]; // (next-state, byte, flags) diff --git a/src/hpack/mod.rs b/src/hpack/mod.rs index 71e7474..1ec1939 100644 --- a/src/hpack/mod.rs +++ b/src/hpack/mod.rs @@ -1,5 +1,5 @@ -mod encoder; mod decoder; +mod encoder; pub(crate) mod header; mod huffman; mod table; diff --git a/src/hpack/table.rs b/src/hpack/table.rs index f8771b0..dbb6bba 100644 --- a/src/hpack/table.rs +++ b/src/hpack/table.rs @@ -4,9 +4,9 @@ use fnv::FnvHasher; use http::header; use http::method::Method; -use std::{cmp, mem, usize}; use std::collections::VecDeque; use std::hash::{Hash, Hasher}; +use std::{cmp, mem, usize}; /// HPACK encoder table #[derive(Debug)] @@ -80,7 +80,7 @@ impl Table { slots: VecDeque::new(), inserted: 0, size: 0, - max_size: max_size, + max_size, } } else { let capacity = cmp::max(to_raw_capacity(capacity).next_power_of_two(), 8); @@ -91,7 +91,7 @@ impl Table { slots: VecDeque::with_capacity(usable_capacity(capacity)), inserted: 0, size: 0, - max_size: max_size, + max_size, } } } @@ -140,10 +140,7 @@ impl Table { // Right now, if this is true, the header name is always in the // static table. At some point in the future, this might not be true // and this logic will need to be updated. - debug_assert!( - statik.is_some(), - "skip_value_index requires a static name", - ); + debug_assert!(statik.is_some(), "skip_value_index requires a static name",); return Index::new(statik, header); } @@ -313,7 +310,7 @@ impl Table { &mut self.indices[probe], Some(Pos { index: pos_idx, - hash: hash, + hash, }), ); @@ -344,8 +341,8 @@ impl Table { self.inserted = self.inserted.wrapping_add(1); self.slots.push_front(Slot { - hash: hash, - header: header, + hash, + header, next: None, }); } @@ -534,89 +531,89 @@ impl Table { #[cfg(test)] fn assert_valid_state(&self, _msg: &'static str) -> bool { /* - // Checks that the internal map structure is valid - // - // Ensure all hash codes in indices match the associated slot - for pos in &self.indices { - if let Some(pos) = *pos { - let real_idx = pos.index.wrapping_add(self.inserted); - - if real_idx.wrapping_add(1) != 0 { - assert!(real_idx < self.slots.len(), - "out of index; real={}; len={}, msg={}", - real_idx, self.slots.len(), msg); - - assert_eq!(pos.hash, self.slots[real_idx].hash, - "index hash does not match slot; msg={}", msg); - } - } - } - - // Every index is only available once - for i in 0..self.indices.len() { - if self.indices[i].is_none() { - continue; - } - - for j in i+1..self.indices.len() { - assert_ne!(self.indices[i], self.indices[j], - "duplicate indices; msg={}", msg); - } - } - - for (index, slot) in self.slots.iter().enumerate() { - let mut indexed = None; - - // First, see if the slot is indexed - for (i, pos) in self.indices.iter().enumerate() { + // Checks that the internal map structure is valid + // + // Ensure all hash codes in indices match the associated slot + for pos in &self.indices { if let Some(pos) = *pos { let real_idx = pos.index.wrapping_add(self.inserted); - if real_idx == index { - indexed = Some(i); - // Already know that there is no dup, so break - break; + + if real_idx.wrapping_add(1) != 0 { + assert!(real_idx < self.slots.len(), + "out of index; real={}; len={}, msg={}", + real_idx, self.slots.len(), msg); + + assert_eq!(pos.hash, self.slots[real_idx].hash, + "index hash does not match slot; msg={}", msg); } } } - if let Some(actual) = indexed { - // Ensure that it is accessible.. - let desired = desired_pos(self.mask, slot.hash); - let mut probe = desired; - let mut dist = 0; + // Every index is only available once + for i in 0..self.indices.len() { + if self.indices[i].is_none() { + continue; + } - probe_loop!(probe < self.indices.len(), { - assert!(self.indices[probe].is_some(), - "unexpected empty slot; probe={}; hash={:?}; msg={}", - probe, slot.hash, msg); - - let pos = self.indices[probe].unwrap(); - - let their_dist = probe_distance(self.mask, pos.hash, probe); - let real_idx = pos.index.wrapping_add(self.inserted); - - if real_idx == index { - break; - } - - assert!(dist <= their_dist, - "could not find entry; actual={}; desired={};" + - "probe={}, dist={}; their_dist={}; index={}; msg={}", - actual, desired, probe, dist, their_dist, - index.wrapping_sub(self.inserted), msg); - - dist += 1; - }); - } else { - // There is exactly one next link - let cnt = self.slots.iter().map(|s| s.next) - .filter(|n| *n == Some(index.wrapping_sub(self.inserted))) - .count(); - - assert_eq!(1, cnt, "more than one node pointing here; msg={}", msg); + for j in i+1..self.indices.len() { + assert_ne!(self.indices[i], self.indices[j], + "duplicate indices; msg={}", msg); + } } - } - */ + + for (index, slot) in self.slots.iter().enumerate() { + let mut indexed = None; + + // First, see if the slot is indexed + for (i, pos) in self.indices.iter().enumerate() { + if let Some(pos) = *pos { + let real_idx = pos.index.wrapping_add(self.inserted); + if real_idx == index { + indexed = Some(i); + // Already know that there is no dup, so break + break; + } + } + } + + if let Some(actual) = indexed { + // Ensure that it is accessible.. + let desired = desired_pos(self.mask, slot.hash); + let mut probe = desired; + let mut dist = 0; + + probe_loop!(probe < self.indices.len(), { + assert!(self.indices[probe].is_some(), + "unexpected empty slot; probe={}; hash={:?}; msg={}", + probe, slot.hash, msg); + + let pos = self.indices[probe].unwrap(); + + let their_dist = probe_distance(self.mask, pos.hash, probe); + let real_idx = pos.index.wrapping_add(self.inserted); + + if real_idx == index { + break; + } + + assert!(dist <= their_dist, + "could not find entry; actual={}; desired={};" + + "probe={}, dist={}; their_dist={}; index={}; msg={}", + actual, desired, probe, dist, their_dist, + index.wrapping_sub(self.inserted), msg); + + dist += 1; + }); + } else { + // There is exactly one next link + let cnt = self.slots.iter().map(|s| s.next) + .filter(|n| *n == Some(index.wrapping_sub(self.inserted))) + .count(); + + assert_eq!(1, cnt, "more than one node pointing here; msg={}", msg); + } + } + */ // TODO: Ensure linked lists are correct: no cycles, etc... @@ -684,11 +681,13 @@ fn index_static(header: &Header) -> Option<(usize, bool)> { ref value, } => match *name { header::ACCEPT_CHARSET => Some((15, false)), - header::ACCEPT_ENCODING => if value == "gzip, deflate" { - Some((16, true)) - } else { - Some((16, false)) - }, + header::ACCEPT_ENCODING => { + if value == "gzip, deflate" { + Some((16, true)) + } else { + Some((16, false)) + } + } header::ACCEPT_LANGUAGE => Some((17, false)), header::ACCEPT_RANGES => Some((18, false)), header::ACCEPT => Some((19, false)), diff --git a/src/hpack/test/fixture.rs b/src/hpack/test/fixture.rs index 31c55e1..b1636dd 100644 --- a/src/hpack/test/fixture.rs +++ b/src/hpack/test/fixture.rs @@ -5,8 +5,8 @@ use hex::FromHex; use serde_json::Value; use std::fs::File; -use std::io::Cursor; use std::io::prelude::*; +use std::io::Cursor; use std::path::Path; use std::str; @@ -30,13 +30,15 @@ fn test_story(story: Value) { .map(|case| { let case = case.as_object().unwrap(); - let size = case.get("header_table_size") + let size = case + .get("header_table_size") .map(|v| v.as_u64().unwrap() as usize); let wire = case.get("wire").unwrap().as_str().unwrap(); let wire: Vec = FromHex::from_hex(wire.as_bytes()).unwrap(); - let expect: Vec<_> = case.get("headers") + let expect: Vec<_> = case + .get("headers") .unwrap() .as_array() .unwrap() @@ -92,7 +94,8 @@ fn test_story(story: Value) { decoder.queue_size_update(size); } - let mut input: Vec<_> = case.expect + let mut input: Vec<_> = case + .expect .iter() .map(|&(ref name, ref value)| { Header::new(name.clone().into(), value.clone().into()) @@ -123,9 +126,7 @@ struct Case { fn key_str(e: &Header) -> &str { match *e { - Header::Field { - ref name, .. - } => name.as_str(), + Header::Field { ref name, .. } => name.as_str(), Header::Authority(..) => ":authority", Header::Method(..) => ":method", Header::Scheme(..) => ":scheme", @@ -136,9 +137,7 @@ fn key_str(e: &Header) -> &str { fn value_str(e: &Header) -> &str { match *e { - Header::Field { - ref value, .. - } => value.to_str().unwrap(), + Header::Field { ref value, .. } => value.to_str().unwrap(), Header::Authority(ref v) => &**v, Header::Method(ref m) => m.as_str(), Header::Scheme(ref v) => &**v, diff --git a/src/hpack/test/fuzz.rs b/src/hpack/test/fuzz.rs index 7b1858d..9885c5d 100644 --- a/src/hpack/test/fuzz.rs +++ b/src/hpack/test/fuzz.rs @@ -78,17 +78,16 @@ impl FuzzHpack { let low = rng.gen_range(0, high); frame.resizes.extend(&[low, high]); - }, + } 1..=3 => { frame.resizes.push(rng.gen_range(128, MAX_CHUNK * 2)); - }, - _ => {}, + } + _ => {} } let mut is_name_required = true; for _ in 0..rng.gen_range(1, (num - added) + 1) { - let x: f64 = rng.gen_range(0.0, 1.0); let x = x.powi(skew); @@ -100,10 +99,10 @@ impl FuzzHpack { if is_name_required { continue; } - }, + } Header::Field { .. } => { is_name_required = false; - }, + } _ => { // pseudos can't be followed by a header with no name is_name_required = true; @@ -153,7 +152,7 @@ impl FuzzHpack { _ => None, }; expect.push(h); - }, + } Err(value) => { expect.push(Header::Field { name: prev_name.as_ref().cloned().expect("previous header name"), @@ -161,7 +160,6 @@ impl FuzzHpack { }); } } - } let mut input = frame.headers.into_iter(); @@ -193,7 +191,7 @@ impl FuzzHpack { .expect("partial decode"); buf = BytesMut::with_capacity(chunks.pop().unwrap_or(MAX_CHUNK)); - }, + } } } @@ -224,7 +222,7 @@ fn gen_header(g: &mut StdRng) -> Header> { 0 => { let value = gen_string(g, 4, 20); Header::Authority(to_shared(value)) - }, + } 1 => { let method = match g.next_u32() % 6 { 0 => Method::GET, @@ -239,12 +237,12 @@ fn gen_header(g: &mut StdRng) -> Header> { .collect(); Method::from_bytes(&bytes).unwrap() - }, + } _ => unreachable!(), }; Header::Method(method) - }, + } 2 => { let value = match g.next_u32() % 2 { 0 => "http", @@ -253,7 +251,7 @@ fn gen_header(g: &mut StdRng) -> Header> { }; Header::Scheme(to_shared(value.to_string())) - }, + } 3 => { let value = match g.next_u32() % 100 { 0 => "/".to_string(), @@ -262,12 +260,12 @@ fn gen_header(g: &mut StdRng) -> Header> { }; Header::Path(to_shared(value)) - }, + } 4 => { let status = (g.gen::() % 500) + 100; Header::Status(StatusCode::from_u16(status).unwrap()) - }, + } _ => unreachable!(), } } else { @@ -282,10 +280,7 @@ fn gen_header(g: &mut StdRng) -> Header> { value.set_sensitive(true); } - Header::Field { - name, - value, - } + Header::Field { name, value } } } @@ -368,8 +363,9 @@ fn gen_header_name(g: &mut StdRng) -> HeaderName { header::X_DNS_PREFETCH_CONTROL, header::X_FRAME_OPTIONS, header::X_XSS_PROTECTION, - ]).unwrap() - .clone() + ]) + .unwrap() + .clone() } else { let value = gen_string(g, 1, 25); HeaderName::from_bytes(value.as_bytes()).unwrap() diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 3b1f7bb..332bcc9 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -96,11 +96,11 @@ where Connection { state: State::Open, error: None, - codec: codec, + codec, go_away: GoAway::new(), ping_pong: PingPong::new(), settings: Settings::new(), - streams: streams, + streams, _phantom: PhantomData, } } @@ -210,11 +210,11 @@ where // This will also handle flushing `self.codec` ready!(self.streams.poll_complete(cx, &mut self.codec))?; - if self.error.is_some() || self.go_away.should_close_on_idle() { - if !self.streams.has_streams() { - self.go_away_now(Reason::NO_ERROR); - continue; - } + if (self.error.is_some() || self.go_away.should_close_on_idle()) + && !self.streams.has_streams() + { + self.go_away_now(Reason::NO_ERROR); + continue; } return Poll::Pending; @@ -289,25 +289,22 @@ where // The order here matters: // - poll_go_away may buffer a graceful shutdown GOAWAY frame // - If it has, we've also added a PING to be sent in poll_ready - match ready!(self.poll_go_away(cx)?) { - Some(reason) => { - if self.go_away.should_close_now() { - if self.go_away.is_user_initiated() { - // A user initiated abrupt shutdown shouldn't return - // the same error back to the user. - return Poll::Ready(Ok(())); - } else { - return Poll::Ready(Err(RecvError::Connection(reason))); - } + if let Some(reason) = ready!(self.poll_go_away(cx)?) { + if self.go_away.should_close_now() { + if self.go_away.is_user_initiated() { + // A user initiated abrupt shutdown shouldn't return + // the same error back to the user. + return Poll::Ready(Ok(())); + } else { + return Poll::Ready(Err(RecvError::Connection(reason))); } - // Only NO_ERROR should be waiting for idle - debug_assert_eq!( - reason, - Reason::NO_ERROR, - "graceful GOAWAY should be NO_ERROR" - ); } - None => (), + // Only NO_ERROR should be waiting for idle + debug_assert_eq!( + reason, + Reason::NO_ERROR, + "graceful GOAWAY should be NO_ERROR" + ); } ready!(self.poll_ready(cx))?; @@ -364,7 +361,7 @@ where } None => { log::trace!("codec closed"); - self.streams.recv_eof(false).ok().expect("mutex poisoned"); + self.streams.recv_eof(false).expect("mutex poisoned"); return Poll::Ready(Ok(())); } } diff --git a/src/proto/go_away.rs b/src/proto/go_away.rs index 1ac2f2e..10eacf6 100644 --- a/src/proto/go_away.rs +++ b/src/proto/go_away.rs @@ -137,7 +137,7 @@ impl GoAway { } let reason = frame.reason(); - dst.buffer(frame.into()).ok().expect("invalid GOAWAY frame"); + dst.buffer(frame.into()).expect("invalid GOAWAY frame"); return Poll::Ready(Some(Ok(reason))); } else if self.should_close_now() { diff --git a/src/proto/peer.rs b/src/proto/peer.rs index 3adf668..8d327fb 100644 --- a/src/proto/peer.rs +++ b/src/proto/peer.rs @@ -17,7 +17,9 @@ pub(crate) trait Peer { fn is_server() -> bool; fn convert_poll_message( - pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId + pseudo: Pseudo, + fields: HeaderMap, + stream_id: StreamId, ) -> Result; fn is_local_init(id: StreamId) -> bool { @@ -54,7 +56,10 @@ impl Dyn { } pub fn convert_poll_message( - &self, pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId + &self, + pseudo: Pseudo, + fields: HeaderMap, + stream_id: StreamId, ) -> Result { if self.is_server() { crate::server::Peer::convert_poll_message(pseudo, fields, stream_id) diff --git a/src/proto/settings.rs b/src/proto/settings.rs index f35aefa..b37386d 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,7 +1,7 @@ use crate::codec::RecvError; use crate::frame; use crate::proto::*; -use std::task::{Poll, Context}; +use std::task::{Context, Poll}; #[derive(Debug)] pub(crate) struct Settings { @@ -13,9 +13,7 @@ pub(crate) struct Settings { impl Settings { pub fn new() -> Self { - Settings { - pending: None, - } + Settings { pending: None } } pub fn recv_settings(&mut self, frame: frame::Settings) { @@ -52,9 +50,7 @@ impl Settings { let frame = frame::Settings::ack(); // Buffer the settings frame - dst.buffer(frame.into()) - .ok() - .expect("invalid settings frame"); + dst.buffer(frame.into()).expect("invalid settings frame"); log::trace!("ACK sent; applying settings"); diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 4a42d95..f2aaf7b 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -27,17 +27,13 @@ struct Slot { impl Buffer { pub fn new() -> Self { - Buffer { - slab: Slab::new(), - } + Buffer { slab: Slab::new() } } } impl Deque { pub fn new() -> Self { - Deque { - indices: None, - } + Deque { indices: None } } pub fn is_empty(&self) -> bool { @@ -45,42 +41,36 @@ impl Deque { } pub fn push_back(&mut self, buf: &mut Buffer, value: T) { - let key = buf.slab.insert(Slot { - value, - next: None, - }); + let key = buf.slab.insert(Slot { value, next: None }); match self.indices { Some(ref mut idxs) => { buf.slab[idxs.tail].next = Some(key); idxs.tail = key; - }, + } None => { self.indices = Some(Indices { head: key, tail: key, }); - }, + } } } pub fn push_front(&mut self, buf: &mut Buffer, value: T) { - let key = buf.slab.insert(Slot { - value, - next: None, - }); + let key = buf.slab.insert(Slot { value, next: None }); match self.indices { Some(ref mut idxs) => { buf.slab[key].next = Some(idxs.head); idxs.head = key; - }, + } None => { self.indices = Some(Indices { head: key, tail: key, }); - }, + } } } @@ -97,8 +87,8 @@ impl Deque { self.indices = Some(idxs); } - return Some(slot.value); - }, + Some(slot.value) + } None => None, } } diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index 27752c1..bcd07e8 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -133,16 +133,18 @@ impl Counts { // TODO: move this to macro? pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) { - log::trace!("transition_after; stream={:?}; state={:?}; is_closed={:?}; \ - pending_send_empty={:?}; buffered_send_data={}; \ - num_recv={}; num_send={}", - stream.id, - stream.state, - stream.is_closed(), - stream.pending_send.is_empty(), - stream.buffered_send_data, - self.num_recv_streams, - self.num_send_streams); + log::trace!( + "transition_after; stream={:?}; state={:?}; is_closed={:?}; \ + pending_send_empty={:?}; buffered_send_data={}; \ + num_recv={}; num_send={}", + stream.id, + stream.state, + stream.is_closed(), + stream.pending_send.is_empty(), + stream.buffered_send_data, + self.num_recv_streams, + self.num_send_streams + ); if stream.is_closed() { if !stream.is_pending_reset_expiration() { diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index 5d9c3c5..b0fd104 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -200,7 +200,6 @@ impl PartialEq for Window { } } - impl PartialEq for WindowSize { fn eq(&self, other: &Window) -> bool { other.eq(self) @@ -227,7 +226,6 @@ impl PartialOrd for WindowSize { } } - impl ::std::ops::SubAssign for Window { fn sub_assign(&mut self, other: WindowSize) { self.0 -= other as i32; diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index fd2158e..508d9a1 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -12,7 +12,7 @@ mod streams; pub(crate) use self::prioritize::Prioritized; pub(crate) use self::recv::Open; pub(crate) use self::send::PollReset; -pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams}; +pub(crate) use self::streams::{OpaqueStreamRef, StreamRef, Streams}; use self::buffer::Buffer; use self::counts::Counts; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index efa1050..8aa8c5e 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -1,5 +1,5 @@ -use super::*; use super::store::Resolve; +use super::*; use crate::frame::{Reason, StreamId}; @@ -8,9 +8,9 @@ use crate::codec::UserError::*; use bytes::buf::Take; use futures::ready; -use std::{cmp, fmt, mem}; use std::io; use std::task::{Context, Poll, Waker}; +use std::{cmp, fmt, mem}; /// # Warning /// @@ -81,7 +81,6 @@ impl Prioritize { let mut flow = FlowControl::new(); flow.inc_window(config.remote_init_window_sz) - .ok() .expect("invalid initial window size"); flow.assign_capacity(config.remote_init_window_sz); @@ -92,7 +91,7 @@ impl Prioritize { pending_send: store::Queue::new(), pending_capacity: store::Queue::new(), pending_open: store::Queue::new(), - flow: flow, + flow, last_opened_id: StreamId::ZERO, in_flight_data_frame: InFlightData::Nothing, } @@ -203,9 +202,7 @@ impl Prioritize { // The stream has no capacity to send the frame now, save it but // don't notify the connection task. Once additional capacity // becomes available, the frame will be flushed. - stream - .pending_send - .push_back(buffer, frame.into()); + stream.pending_send.push_back(buffer, frame.into()); } Ok(()) @@ -216,7 +213,8 @@ impl Prioritize { &mut self, capacity: WindowSize, stream: &mut store::Ptr, - counts: &mut Counts) { + counts: &mut Counts, + ) { log::trace!( "reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}", stream.id, @@ -338,8 +336,8 @@ impl Prioritize { &mut self, inc: WindowSize, store: &mut R, - counts: &mut Counts) - where + counts: &mut Counts, + ) where R: Resolve, { log::trace!("assign_connection_capacity; inc={}", inc); @@ -419,11 +417,7 @@ impl Prioritize { // TODO: Should prioritization factor into this? let assign = cmp::min(conn_available, additional); - log::trace!( - " assigning; stream={:?}, capacity={}", - stream.id, - assign, - ); + log::trace!(" assigning; stream={:?}, capacity={}", stream.id, assign,); // Assign the capacity to the stream stream.assign_capacity(assign); @@ -440,16 +434,16 @@ impl Prioritize { stream.send_flow.has_unavailable() ); - if stream.send_flow.available() < stream.requested_send_capacity { - if stream.send_flow.has_unavailable() { - // The stream requires additional capacity and the stream's - // window has available capacity, but the connection window - // does not. - // - // In this case, the stream needs to be queued up for when the - // connection has more capacity. - self.pending_capacity.push(stream); - } + if stream.send_flow.available() < stream.requested_send_capacity + && stream.send_flow.has_unavailable() + { + // The stream requires additional capacity and the stream's + // window has available capacity, but the connection window + // does not. + // + // In this case, the stream needs to be queued up for when the + // connection has more capacity. + self.pending_capacity.push(stream); } // If data is buffered and the stream is not pending open, then @@ -515,26 +509,26 @@ impl Prioritize { if let Frame::Data(ref frame) = frame { self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); } - dst.buffer(frame).ok().expect("invalid frame"); + dst.buffer(frame).expect("invalid frame"); // Ensure the codec is ready to try the loop again. ready!(dst.poll_ready(cx))?; // Because, always try to reclaim... self.reclaim_frame(buffer, store, dst); - }, + } None => { // Try to flush the codec. ready!(dst.flush(cx))?; // This might release a data frame... if !self.reclaim_frame(buffer, store, dst) { - return Poll::Ready(Ok(())) + return Poll::Ready(Ok(())); } // No need to poll ready as poll_complete() does this for // us... - }, + } } } } @@ -603,11 +597,12 @@ impl Prioritize { /// Push the frame to the front of the stream's deque, scheduling the /// stream if needed. - fn push_back_frame(&mut self, - frame: Frame, - buffer: &mut Buffer>, - stream: &mut store::Ptr) - { + fn push_back_frame( + &mut self, + frame: Frame, + buffer: &mut Buffer>, + stream: &mut store::Ptr, + ) { // Push the frame to the front of the stream's deque stream.pending_send.push_front(buffer, frame); @@ -665,8 +660,11 @@ impl Prioritize { loop { match self.pending_send.pop(store) { Some(mut stream) => { - log::trace!("pop_frame; stream={:?}; stream.state={:?}", - stream.id, stream.state); + log::trace!( + "pop_frame; stream={:?}; stream.state={:?}", + stream.id, + stream.state + ); // It's possible that this stream, besides having data to send, // is also queued to send a reset, and thus is already in the queue @@ -675,8 +673,11 @@ impl Prioritize { // To be safe, we just always ask the stream. let is_pending_reset = stream.is_pending_reset_expiration(); - log::trace!(" --> stream={:?}; is_pending_reset={:?};", - stream.id, is_pending_reset); + log::trace!( + " --> stream={:?}; is_pending_reset={:?};", + stream.id, + is_pending_reset + ); let frame = match stream.pending_send.pop_front(buffer) { Some(Frame::Data(mut frame)) => { @@ -715,9 +716,7 @@ impl Prioritize { // happen if the remote reduced the stream // window. In this case, we need to buffer the // frame and wait for a window update... - stream - .pending_send - .push_front(buffer, frame.into()); + stream.pending_send.push_front(buffer, frame.into()); continue; } @@ -726,7 +725,8 @@ impl Prioritize { let len = cmp::min(sz, max_len); // Only send up to the stream's window capacity - let len = cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; + let len = + cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; // There *must* be be enough connection level // capacity at this point. @@ -761,20 +761,18 @@ impl Prioritize { frame.set_end_stream(false); } - Frame::Data(frame.map(|buf| { - Prioritized { - inner: buf.take(len), - end_of_stream: eos, - stream: stream.key(), - } + Frame::Data(frame.map(|buf| Prioritized { + inner: buf.take(len), + end_of_stream: eos, + stream: stream.key(), })) - }, - Some(frame) => frame.map(|_| + } + Some(frame) => frame.map(|_| { unreachable!( "Frame::map closure will only be called \ on DATA frames." - ) - ), + ) + }), None => { if let Some(reason) = stream.state.get_scheduled_reset() { stream.state.set_reset(reason); @@ -814,7 +812,7 @@ impl Prioritize { counts.transition_after(stream, is_pending_reset); return Some(frame); - }, + } None => return None, } } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 9c78f7c..6e7bf47 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,15 +1,15 @@ -use std::task::Context; use super::*; -use crate::{frame, proto}; use crate::codec::{RecvError, UserError}; use crate::frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE}; +use crate::{frame, proto}; +use std::task::Context; -use http::{HeaderMap, Response, Request, Method}; use futures::ready; +use http::{HeaderMap, Method, Request, Response}; use std::io; -use std::time::{Duration, Instant}; use std::task::{Poll, Waker}; +use std::time::{Duration, Instant}; #[derive(Debug)] pub(super) struct Recv { @@ -98,7 +98,7 @@ impl Recv { Recv { init_window_sz: config.local_init_window_sz, - flow: flow, + flow, in_flight_data: 0 as WindowSize, next_stream_id: Ok(next_stream_id.into()), pending_window_updates: store::Queue::new(), @@ -186,8 +186,9 @@ impl Recv { return Err(RecvError::Stream { id: stream.id, reason: Reason::PROTOCOL_ERROR, - }.into()) - }, + } + .into()); + } }; stream.content_length = ContentLength::Remaining(content_length); @@ -215,7 +216,7 @@ impl Recv { let mut res = frame::Headers::new( stream.id, frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE), - HeaderMap::new() + HeaderMap::new(), ); res.set_end_stream(); Err(RecvHeaderBlockError::Oversize(Some(res))) @@ -226,7 +227,9 @@ impl Recv { let stream_id = frame.stream_id(); let (pseudo, fields) = frame.into_parts(); - let message = counts.peer().convert_poll_message(pseudo, fields, stream_id)?; + let message = counts + .peer() + .convert_poll_message(pseudo, fields, stream_id)?; // Push the frame onto the stream's recv buffer stream @@ -246,9 +249,7 @@ impl Recv { /// Called by the server to get the request /// /// TODO: Should this fn return `Result`? - pub fn take_request(&mut self, stream: &mut store::Ptr) - -> Request<()> - { + pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> { use super::peer::PollMessage::*; match stream.pending_recv.pop_front(&mut self.buffer) { @@ -261,20 +262,19 @@ impl Recv { pub fn poll_pushed( &mut self, cx: &Context, - stream: &mut store::Ptr + stream: &mut store::Ptr, ) -> Poll, store::Key), proto::Error>>> { use super::peer::PollMessage::*; let mut ppp = stream.pending_push_promises.take(); - let pushed = ppp.pop(stream.store_mut()).map( - |mut pushed| match pushed.pending_recv.pop_front(&mut self.buffer) { - Some(Event::Headers(Server(headers))) => - (headers, pushed.key()), + let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| { + match pushed.pending_recv.pop_front(&mut self.buffer) { + Some(Event::Headers(Server(headers))) => (headers, pushed.key()), // When frames are pushed into the queue, it is verified that // the first frame is a HEADERS frame. - _ => panic!("Headers not set on pushed stream") + _ => panic!("Headers not set on pushed stream"), } - ); + }); stream.pending_push_promises = ppp; if let Some(p) = pushed { Poll::Ready(Some(Ok(p))) @@ -301,14 +301,14 @@ impl Recv { // If the buffer is not empty, then the first frame must be a HEADERS // frame or the user violated the contract. match stream.pending_recv.pop_front(&mut self.buffer) { - Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response.into())), + Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)), Some(_) => panic!("poll_response called after response returned"), None => { stream.state.ensure_recv_open()?; stream.recv_task = Some(cx.waker().clone()); Poll::Pending - }, + } } } @@ -341,11 +341,7 @@ impl Recv { } /// Releases capacity of the connection - pub fn release_connection_capacity( - &mut self, - capacity: WindowSize, - task: &mut Option, - ) { + pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option) { log::trace!( "release_connection_capacity; size={}, connection in_flight_data={}", capacity, @@ -386,7 +382,6 @@ impl Recv { // Assign capacity to stream stream.recv_flow.assign_capacity(capacity); - if stream.recv_flow.unclaimed_capacity().is_some() { // Queue the stream for sending the WINDOW_UPDATE frame. self.pending_window_updates.push(stream); @@ -400,11 +395,7 @@ impl Recv { } /// Release any unclaimed capacity for a closed stream. - pub fn release_closed_capacity( - &mut self, - stream: &mut store::Ptr, - task: &mut Option, - ) { + pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option) { debug_assert_eq!(stream.ref_count, 0); if stream.in_flight_recv_data == 0 { @@ -417,10 +408,7 @@ impl Recv { stream.in_flight_recv_data, ); - self.release_connection_capacity( - stream.in_flight_recv_data, - task, - ); + self.release_connection_capacity(stream.in_flight_recv_data, task); stream.in_flight_recv_data = 0; self.clear_recv_buffer(stream); @@ -485,9 +473,7 @@ impl Recv { return false; } - stream - .pending_recv - .is_empty() + stream.pending_recv.is_empty() } pub fn recv_data( @@ -522,7 +508,6 @@ impl Recv { stream.recv_flow.window_size() ); - if is_ignoring_frame { log::trace!( "recv_data; frame ignored on locally reset {:?} for some time", @@ -609,7 +594,7 @@ impl Recv { // the capacity as available to be reclaimed. When the available // capacity meets a threshold, a WINDOW_UPDATE is then sent. self.release_connection_capacity(sz, &mut None); - return Ok(()); + Ok(()) } pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> { @@ -670,7 +655,7 @@ impl Recv { // MUST reset the promised stream with a stream error" if let Some(content_length) = req.headers().get(header::CONTENT_LENGTH) { match parse_u64(content_length.as_bytes()) { - Ok(0) => {}, + Ok(0) => {} otherwise => { proto_err!(stream: "recv_push_promise; promised request has content-length {:?}; promised_id={:?}", @@ -681,7 +666,7 @@ impl Recv { id: promised_id, reason: Reason::PROTOCOL_ERROR, }); - }, + } } } // "The server MUST include a method in the :method pseudo-header field @@ -699,7 +684,9 @@ impl Recv { }); } use super::peer::PollMessage::*; - stream.pending_recv.push_back(&mut self.buffer, Event::Headers(Server(req))); + stream + .pending_recv + .push_back(&mut self.buffer, Event::Headers(Server(req))); stream.notify_recv(); Ok(()) } @@ -707,14 +694,17 @@ impl Recv { fn safe_and_cacheable(method: &Method) -> bool { // Cacheable: https://httpwg.org/specs/rfc7231.html#cacheable.methods // Safe: https://httpwg.org/specs/rfc7231.html#safe.methods - return method == Method::GET || method == Method::HEAD; + method == Method::GET || method == Method::HEAD } /// Ensures that `id` is not in the `Idle` state. pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { if let Ok(next) = self.next_stream_id { if id >= next { - log::debug!("stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}", id); + log::debug!( + "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}", + id + ); return Err(Reason::PROTOCOL_ERROR); } } @@ -726,7 +716,9 @@ impl Recv { /// Handle remote sending an explicit RST_STREAM. pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { // Notify the stream - stream.state.recv_reset(frame.reason(), stream.is_pending_send); + stream + .state + .recv_reset(frame.reason(), stream.is_pending_send); stream.notify_send(); stream.notify_recv(); @@ -777,10 +769,7 @@ impl Recv { pub fn may_have_created_stream(&self, id: StreamId) -> bool { if let Ok(next_id) = self.next_stream_id { // Peer::is_local_init should have been called beforehand - debug_assert_eq!( - id.is_server_initiated(), - next_id.is_server_initiated(), - ); + debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),); id < next_id } else { true @@ -788,9 +777,7 @@ impl Recv { } /// Returns true if the remote peer can reserve a stream with the given ID. - pub fn ensure_can_reserve(&self) - -> Result<(), RecvError> - { + pub fn ensure_can_reserve(&self) -> Result<(), RecvError> { if !self.is_push_enabled { proto_err!(conn: "recv_push_promise: push is disabled"); return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); @@ -800,11 +787,7 @@ impl Recv { } /// Add a locally reset stream to queue to be eventually reaped. - pub fn enqueue_reset_expiration( - &mut self, - stream: &mut store::Ptr, - counts: &mut Counts, - ) { + pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { if !stream.state.is_local_reset() || stream.is_pending_reset_expiration() { return; } @@ -843,9 +826,7 @@ impl Recv { let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM); // Buffer the frame - dst.buffer(frame.into()) - .ok() - .expect("invalid RST_STREAM frame"); + dst.buffer(frame.into()).expect("invalid RST_STREAM frame"); } self.refused = None; @@ -864,11 +845,12 @@ impl Recv { } } - pub fn clear_queues(&mut self, - clear_pending_accept: bool, - store: &mut Store, - counts: &mut Counts) - { + pub fn clear_queues( + &mut self, + clear_pending_accept: bool, + store: &mut Store, + counts: &mut Counts, + ) { self.clear_stream_window_update_queue(store, counts); self.clear_all_reset_streams(store, counts); @@ -921,7 +903,7 @@ impl Recv { /// Send connection level window update fn send_connection_window_update( &mut self, - cx: &mut Context, + cx: &mut Context, dst: &mut Codec>, ) -> Poll> where @@ -936,13 +918,11 @@ impl Recv { // Buffer the WINDOW_UPDATE frame dst.buffer(frame.into()) - .ok() .expect("invalid WINDOW_UPDATE frame"); // Update flow control self.flow .inc_window(incr) - .ok() .expect("unexpected flow control state"); } @@ -992,14 +972,12 @@ impl Recv { // Buffer it dst.buffer(frame.into()) - .ok() .expect("invalid WINDOW_UPDATE frame"); // Update flow control stream .recv_flow .inc_window(incr) - .ok() .expect("unexpected flow control state"); } }) @@ -1010,7 +988,11 @@ impl Recv { self.pending_accept.pop(store).map(|ptr| ptr.key()) } - pub fn poll_data(&mut self, cx: &Context, stream: &mut Stream) -> Poll>> { + pub fn poll_data( + &mut self, + cx: &Context, + stream: &mut Stream, + ) -> Poll>> { // TODO: Return error when the stream is reset match stream.pending_recv.pop_front(&mut self.buffer) { Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))), @@ -1030,7 +1012,7 @@ impl Recv { // No more data frames Poll::Ready(None) - }, + } None => self.schedule_recv(cx, stream), } } @@ -1047,12 +1029,16 @@ impl Recv { stream.pending_recv.push_front(&mut self.buffer, event); Poll::Pending - }, + } None => self.schedule_recv(cx, stream), } } - fn schedule_recv(&mut self, cx: &Context, stream: &mut Stream) -> Poll>> { + fn schedule_recv( + &mut self, + cx: &Context, + stream: &mut Stream, + ) -> Poll>> { if stream.state.ensure_recv_open()? { // Request to get notified once more frames arrive stream.recv_task = Some(cx.waker().clone()); @@ -1112,7 +1098,7 @@ fn parse_u64(src: &[u8]) -> Result { } ret *= 10; - ret += (d - b'0') as u64; + ret += u64::from(d - b'0'); } Ok(ret) diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 4a723ce..70e0fe1 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -325,13 +325,7 @@ impl Send { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { log::debug!("recv_stream_window_update !!; err={:?}", e); - self.send_reset( - Reason::FLOW_CONTROL_ERROR.into(), - buffer, - stream, - counts, - task, - ); + self.send_reset(Reason::FLOW_CONTROL_ERROR, buffer, stream, counts, task); return Err(e); } diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 1493371..cd162be 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -1,7 +1,7 @@ use std::io; -use crate::codec::{RecvError, UserError}; use crate::codec::UserError::*; +use crate::codec::{RecvError, UserError}; use crate::frame::Reason; use crate::proto::{self, PollReset}; @@ -94,37 +94,40 @@ impl State { let local = Streaming; self.inner = match self.inner { - Idle => if eos { - HalfClosedLocal(AwaitingHeaders) - } else { - Open { - local, - remote: AwaitingHeaders, + Idle => { + if eos { + HalfClosedLocal(AwaitingHeaders) + } else { + Open { + local, + remote: AwaitingHeaders, + } } - }, + } Open { local: AwaitingHeaders, remote, - } => if eos { - HalfClosedLocal(remote) - } else { - Open { - local, - remote, + } => { + if eos { + HalfClosedLocal(remote) + } else { + Open { local, remote } } - }, - HalfClosedRemote(AwaitingHeaders) => if eos { - Closed(Cause::EndStream) - } else { - HalfClosedRemote(local) - }, + } + HalfClosedRemote(AwaitingHeaders) => { + if eos { + Closed(Cause::EndStream) + } else { + HalfClosedRemote(local) + } + } _ => { // All other transitions result in a protocol error return Err(UnexpectedFrameType); - }, + } }; - return Ok(()); + Ok(()) } /// Opens the receive-half of the stream when a HEADERS frame is received. @@ -146,7 +149,7 @@ impl State { remote, } } - }, + } ReservedRemote => { initial = true; @@ -155,31 +158,32 @@ impl State { } else { HalfClosedLocal(Streaming) } - }, + } Open { local, remote: AwaitingHeaders, - } => if eos { - HalfClosedRemote(local) - } else { - Open { - local, - remote, + } => { + if eos { + HalfClosedRemote(local) + } else { + Open { local, remote } } - }, - HalfClosedLocal(AwaitingHeaders) => if eos { - Closed(Cause::EndStream) - } else { - HalfClosedLocal(remote) - }, + } + HalfClosedLocal(AwaitingHeaders) => { + if eos { + Closed(Cause::EndStream) + } else { + HalfClosedLocal(remote) + } + } state => { // All other transitions result in a protocol error proto_err!(conn: "recv_open: in unexpected state {:?}", state); return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); - }, + } }; - return Ok(initial); + Ok(initial) } /// Transition from Idle -> ReservedRemote @@ -188,7 +192,7 @@ impl State { Idle => { self.inner = ReservedRemote; Ok(()) - }, + } state => { proto_err!(conn: "reserve_remote: in unexpected state {:?}", state); Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) @@ -199,19 +203,17 @@ impl State { /// Indicates that the remote side will not send more data to the local. pub fn recv_close(&mut self) -> Result<(), RecvError> { match self.inner { - Open { - local, .. - } => { + Open { local, .. } => { // The remote side will continue to receive data. log::trace!("recv_close: Open => HalfClosedRemote({:?})", local); self.inner = HalfClosedRemote(local); Ok(()) - }, + } HalfClosedLocal(..) => { log::trace!("recv_close: HalfClosedLocal => Closed"); self.inner = Closed(Cause::EndStream); Ok(()) - }, + } state => { proto_err!(conn: "recv_close: in unexpected state {:?}", state); Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) @@ -228,7 +230,7 @@ impl State { match self.inner { // If the stream is already in a `Closed` state, do nothing, // provided that there are no frames still in the send queue. - Closed(..) if !queued => {}, + Closed(..) if !queued => {} // A notionally `Closed` stream may still have queued frames in // the following cases: // @@ -246,11 +248,12 @@ impl State { state => { log::trace!( "recv_reset; reason={:?}; state={:?}; queued={:?}", - reason, state, queued + reason, + state, + queued ); self.inner = Closed(Cause::Proto(reason)); - }, - + } } } @@ -259,20 +262,20 @@ impl State { use crate::proto::Error::*; match self.inner { - Closed(..) => {}, + Closed(..) => {} _ => { log::trace!("recv_err; err={:?}", err); self.inner = Closed(match *err { Proto(reason) => Cause::LocallyReset(reason), Io(..) => Cause::Io, }); - }, + } } } pub fn recv_eof(&mut self) { match self.inner { - Closed(..) => {}, + Closed(..) => {} s => { log::trace!("recv_eof; state={:?}", s); self.inner = Closed(Cause::Io); @@ -283,17 +286,15 @@ impl State { /// Indicates that the local side will not send more data to the local. pub fn send_close(&mut self) { match self.inner { - Open { - remote, .. - } => { + Open { remote, .. } => { // The remote side will continue to receive data. log::trace!("send_close: Open => HalfClosedLocal({:?})", remote); self.inner = HalfClosedLocal(remote); - }, + } HalfClosedRemote(..) => { log::trace!("send_close: HalfClosedRemote => Closed"); self.inner = Closed(Cause::EndStream); - }, + } state => panic!("send_close: unexpected state {:?}", state), } } @@ -343,8 +344,7 @@ impl State { pub fn is_send_streaming(&self) -> bool { match self.inner { Open { - local: Streaming, - .. + local: Streaming, .. } => true, HalfClosedRemote(Streaming) => true, _ => false, @@ -368,8 +368,7 @@ impl State { pub fn is_recv_streaming(&self) -> bool { match self.inner { Open { - remote: Streaming, - .. + remote: Streaming, .. } => true, HalfClosedLocal(Streaming) => true, _ => false, @@ -407,12 +406,11 @@ impl State { pub fn ensure_recv_open(&self) -> Result { // TODO: Is this correct? match self.inner { - Closed(Cause::Proto(reason)) | - Closed(Cause::LocallyReset(reason)) | - Closed(Cause::Scheduled(reason)) => Err(proto::Error::Proto(reason)), + Closed(Cause::Proto(reason)) + | Closed(Cause::LocallyReset(reason)) + | Closed(Cause::Scheduled(reason)) => Err(proto::Error::Proto(reason)), Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())), - Closed(Cause::EndStream) | - HalfClosedRemote(..) => Ok(false), + Closed(Cause::EndStream) | HalfClosedRemote(..) => Ok(false), _ => Ok(true), } } @@ -420,15 +418,15 @@ impl State { /// Returns a reason if the stream has been reset. pub(super) fn ensure_reason(&self, mode: PollReset) -> Result, crate::Error> { match self.inner { - Closed(Cause::Proto(reason)) | - Closed(Cause::LocallyReset(reason)) | - Closed(Cause::Scheduled(reason)) => Ok(Some(reason)), + Closed(Cause::Proto(reason)) + | Closed(Cause::LocallyReset(reason)) + | Closed(Cause::Scheduled(reason)) => Ok(Some(reason)), Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into()).into()), - Open { local: Streaming, .. } | - HalfClosedRemote(Streaming) => match mode { - PollReset::AwaitingHeaders => { - Err(UserError::PollResetAfterSendResponse.into()) - }, + Open { + local: Streaming, .. + } + | HalfClosedRemote(Streaming) => match mode { + PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()), PollReset::Streaming => Ok(None), }, _ => Ok(None), @@ -438,9 +436,7 @@ impl State { impl Default for State { fn default() -> State { - State { - inner: Inner::Idle, - } + State { inner: Inner::Idle } } } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 672657a..9beec45 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -118,9 +118,7 @@ impl Store { use self::indexmap::map::Entry::*; match self.ids.entry(id) { - Occupied(e) => Entry::Occupied(OccupiedEntry { - ids: e, - }), + Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }), Vacant(e) => Entry::Vacant(VacantEntry { ids: e, slab: &mut self.slab, @@ -143,10 +141,7 @@ impl Store { }; f(Ptr { - key: Key { - index, - stream_id, - }, + key: Key { index, stream_id }, store: self, })?; @@ -167,10 +162,7 @@ impl Store { impl Resolve for Store { fn resolve(&mut self, key: Key) -> Ptr { - Ptr { - key: key, - store: self, - } + Ptr { key, store: self } } } @@ -267,14 +259,14 @@ where // Update the tail pointer idxs.tail = stream.key(); - }, + } None => { log::trace!(" -> first entry"); self.indices = Some(store::Indices { head: stream.key(), tail: stream.key(), }); - }, + } } true @@ -356,7 +348,7 @@ impl<'a> Ptr<'a> { impl<'a> Resolve for Ptr<'a> { fn resolve(&mut self, key: Key) -> Ptr { Ptr { - key: key, + key, store: &mut *self.store, } } @@ -388,10 +380,7 @@ impl<'a> OccupiedEntry<'a> { pub fn key(&self) -> Key { let stream_id = *self.ids.key(); let index = *self.ids.get(); - Key { - index, - stream_id, - } + Key { index, stream_id } } } @@ -406,9 +395,6 @@ impl<'a> VacantEntry<'a> { // Insert the handle in the ID map self.ids.insert(index); - Key { - index, - stream_id, - } + Key { index, stream_id } } } diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index d3caf5c..075d71f 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -1,8 +1,8 @@ use super::*; +use std::task::{Context, Waker}; use std::time::Instant; use std::usize; -use std::task::{Context, Waker}; /// Tracks Stream related state /// @@ -133,23 +133,17 @@ pub(super) struct NextOpen; pub(super) struct NextResetExpire; impl Stream { - pub fn new( - id: StreamId, - init_send_window: WindowSize, - init_recv_window: WindowSize, - ) -> Stream { + pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream { let mut send_flow = FlowControl::new(); let mut recv_flow = FlowControl::new(); recv_flow .inc_window(init_recv_window) - .ok() .expect("invalid initial receive window"); recv_flow.assign_capacity(init_recv_window); send_flow .inc_window(init_send_window) - .ok() .expect("invalid initial send window size"); Stream { @@ -161,7 +155,7 @@ impl Stream { // ===== Fields related to sending ===== next_pending_send: None, is_pending_send: false, - send_flow: send_flow, + send_flow, requested_send_capacity: 0, buffered_send_data: 0, send_task: None, @@ -175,7 +169,7 @@ impl Stream { // ===== Fields related to receiving ===== next_pending_accept: None, is_pending_accept: false, - recv_flow: recv_flow, + recv_flow, in_flight_recv_data: 0, next_window_update: None, is_pending_window_update: false, @@ -247,8 +241,12 @@ impl Stream { self.send_capacity_inc = true; self.send_flow.assign_capacity(capacity); - log::trace!(" assigned capacity to stream; available={}; buffered={}; id={:?}", - self.send_flow.available(), self.buffered_send_data, self.id); + log::trace!( + " assigned capacity to stream; available={}; buffered={}; id={:?}", + self.send_flow.available(), + self.buffered_send_data, + self.id + ); // Only notify if the capacity exceeds the amount of buffered data if self.send_flow.available() > self.buffered_send_data { @@ -265,7 +263,7 @@ impl Stream { None => return Err(()), }, ContentLength::Head => return Err(()), - _ => {}, + _ => {} } Ok(()) diff --git a/src/server.rs b/src/server.rs index dc9d257..c2060e2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -465,13 +465,10 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Always try to advance the internal state. Getting Pending also is // needed to allow this function to return Pending. - match self.poll_close(cx)? { - Poll::Ready(_) => { - // If the socket is closed, don't return anything - // TODO: drop any pending streams - return Poll::Ready(None); - } - _ => {} + if let Poll::Ready(_) = self.poll_close(cx)? { + // If the socket is closed, don't return anything + // TODO: drop any pending streams + return Poll::Ready(None); } if let Some(inner) = self.connection.next_incoming() { diff --git a/tests/h2-support/src/client_ext.rs b/tests/h2-support/src/client_ext.rs index 4320301..c469a81 100644 --- a/tests/h2-support/src/client_ext.rs +++ b/tests/h2-support/src/client_ext.rs @@ -1,6 +1,6 @@ use bytes::IntoBuf; -use http::Request; use h2::client::{ResponseFuture, SendRequest}; +use http::Request; /// Extend the `h2::client::SendRequest` type with convenience methods. pub trait SendRequestExt { @@ -22,7 +22,7 @@ where .expect("valid uri"); let (fut, _tx) = self - .send_request(req, /*eos =*/true) + .send_request(req, /*eos =*/ true) .expect("send_request"); fut diff --git a/tests/h2-support/src/frames.rs b/tests/h2-support/src/frames.rs index 7082e03..10b99ad 100644 --- a/tests/h2-support/src/frames.rs +++ b/tests/h2-support/src/frames.rs @@ -137,8 +137,7 @@ impl Mock { Mock(frame) } - pub fn scheme(self, value: &str) -> Self - { + pub fn scheme(self, value: &str) -> Self { let (id, mut pseudo, fields) = self.into_parts(); let value = value.parse().unwrap(); @@ -206,7 +205,6 @@ impl From> for SendFrame { } } - // PushPromise helpers impl Mock { @@ -240,7 +238,6 @@ impl Mock { Mock(frame) } - fn into_parts(self) -> (StreamId, StreamId, frame::Pseudo, HeaderMap) { assert!(self.0.is_end_headers(), "unset eoh will be lost"); let id = self.0.stream_id(); @@ -280,10 +277,7 @@ impl Mock { } pub fn reason(self, reason: frame::Reason) -> Self { - Mock(frame::GoAway::new( - self.0.last_stream_id(), - reason, - )) + Mock(frame::GoAway::new(self.0.last_stream_id(), reason)) } } diff --git a/util/genfixture/src/main.rs b/util/genfixture/src/main.rs index c8aec60..a6d7307 100644 --- a/util/genfixture/src/main.rs +++ b/util/genfixture/src/main.rs @@ -1,8 +1,8 @@ use walkdir::WalkDir; +use std::collections::HashMap; use std::env; use std::path::Path; -use std::collections::HashMap; fn main() { let args: Vec<_> = env::args().collect(); @@ -30,7 +30,9 @@ fn main() { // Now, split that into the group and the name let module = fixture_path.split("/").next().unwrap(); - tests.entry(module.to_string()).or_insert(vec![]) + tests + .entry(module.to_string()) + .or_insert(vec![]) .push(fixture_path.to_string()); } @@ -49,9 +51,7 @@ fn main() { println!(" {} => {{", module); for test in tests { - let ident = test - .split("/").nth(1).unwrap() - .split(".").next().unwrap(); + let ident = test.split("/").nth(1).unwrap().split(".").next().unwrap(); println!(" ({}, {:?});", ident, test); } diff --git a/util/genhuff/src/main.rs b/util/genhuff/src/main.rs index 30b6845..2d5b0ba 100644 --- a/util/genhuff/src/main.rs +++ b/util/genhuff/src/main.rs @@ -98,18 +98,17 @@ impl Node { } } - fn compute_transition(&self, - byte: Option, - start: &Node, - root: &Node, - steps_remaining: usize) - { + fn compute_transition( + &self, + byte: Option, + start: &Node, + root: &Node, + steps_remaining: usize, + ) { if steps_remaining == 0 { let (byte, target) = match byte { Some(256) => (None, None), - _ => { - (byte, Some(self.id.unwrap_or(0))) - } + _ => (byte, Some(self.id.unwrap_or(0))), }; start.transitions.borrow_mut().push(Transition { @@ -257,8 +256,6 @@ pub fn main() { println!("];"); } - - const TABLE: &'static str = r##" ( 0) |11111111|11000 1ff8 [13] ( 1) |11111111|11111111|1011000 7fffd8 [23]