diff --git a/.rustfmt.toml b/.rustfmt.toml index 8dda043..3e26058 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -35,7 +35,7 @@ where_pred_indent = "Visual" generics_indent = "Block" struct_lit_style = "Block" struct_lit_multiline_style = "ForceMulti" -fn_call_style = "Visual" +fn_call_style = "Block" report_todo = "Never" report_fixme = "Never" chain_indent = "Block" @@ -57,7 +57,7 @@ wrap_comments = false comment_width = 80 normalize_comments = false wrap_match_arms = true -match_block_trailing_comma = false +match_block_trailing_comma = true indent_match_arms = true match_pattern_separator_break_point = "Back" closure_block_indent_threshold = 0 diff --git a/examples/akamai.rs b/examples/akamai.rs index 0a16268..5f27771 100644 --- a/examples/akamai.rs +++ b/examples/akamai.rs @@ -27,11 +27,12 @@ pub fn main() { let _ = env_logger::init(); let tls_client_config = std::sync::Arc::new({ - let mut c = rustls::ClientConfig::new(); - c.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - c.alpn_protocols.push(ALPN_H2.to_owned()); - c - }); + let mut c = rustls::ClientConfig::new(); + c.root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + c.alpn_protocols.push(ALPN_H2.to_owned()); + c + }); // Sync DNS resolution. let addr = "http2.akamai.com:443" diff --git a/examples/client.rs b/examples/client.rs index 6e66391..5088ad0 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -36,10 +36,10 @@ impl Future for Process { match try_ready!(self.body.poll()) { Some(chunk) => { println!("GOT CHUNK = {:?}", chunk); - } + }, None => { self.trailers = true; - } + }, } } } diff --git a/examples/server-tr.rs b/examples/server-tr.rs index 4c002b2..56a472f 100644 --- a/examples/server-tr.rs +++ b/examples/server-tr.rs @@ -1,4 +1,3 @@ - extern crate bytes; extern crate env_logger; extern crate futures; @@ -57,8 +56,9 @@ pub fn main() { Ok(()) }).and_then(|_| { - println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! \ - ~~~~~~~~~~~"); + println!( + "~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~" + ); Ok(()) }) }) diff --git a/examples/server.rs b/examples/server.rs index 6f4acbd..9d34f85 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -1,4 +1,3 @@ - extern crate bytes; extern crate env_logger; extern crate futures; @@ -49,8 +48,9 @@ pub fn main() { Ok(()) }).and_then(|_| { - println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! \ - ~~~~~~~~~~~"); + println!( + "~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~" + ); Ok(()) }) }) diff --git a/src/client.rs b/src/client.rs index 7bfab0e..f3a38ce 100644 --- a/src/client.rs +++ b/src/client.rs @@ -80,9 +80,9 @@ where Ok(AsyncSink::Ready) => { let connection = Connection::new(codec); Ok(Client { - connection, - }) - } + connection, + }) + }, Ok(_) => unreachable!(), Err(e) => Err(::Error::from(e)), } @@ -294,13 +294,15 @@ impl proto::Peer for Peer { fn convert_send_message(id: StreamId, request: Self::Send, end_of_stream: bool) -> Headers { use http::request::Parts; - let (Parts { - method, - uri, - headers, - .. - }, - _) = request.into_parts(); + let ( + Parts { + method, + uri, + headers, + .. + }, + _, + ) = request.into_parts(); // Build the set pseudo header set. All requests will include `method` // and `path`. @@ -332,10 +334,10 @@ impl proto::Peer for Peer { // TODO: Should there be more specialized handling for different // kinds of errors return Err(RecvError::Stream { - id: stream_id, - reason: ProtocolError, - }); - } + id: stream_id, + reason: ProtocolError, + }); + }, }; *response.headers_mut() = fields; diff --git a/src/codec/framed_read.rs b/src/codec/framed_read.rs index a184aa0..16efd70 100644 --- a/src/codec/framed_read.rs +++ b/src/codec/framed_read.rs @@ -70,24 +70,24 @@ impl FramedRead { let res = frame::Settings::load(head, &bytes[frame::HEADER_LEN..]); res.map_err(|_| Connection(ProtocolError))?.into() - } + }, Kind::Ping => { let res = frame::Ping::load(head, &bytes[frame::HEADER_LEN..]); res.map_err(|_| Connection(ProtocolError))?.into() - } + }, Kind::WindowUpdate => { let res = frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..]); res.map_err(|_| Connection(ProtocolError))?.into() - } + }, Kind::Data => { let _ = bytes.split_to(frame::HEADER_LEN); let res = frame::Data::load(head, bytes.freeze()); // TODO: Should this always be connection level? Probably not... res.map_err(|_| Connection(ProtocolError))?.into() - } + }, Kind::Headers => { // Drop the frame header // TODO: Change to drain: carllerche/bytes#130 @@ -101,23 +101,23 @@ impl FramedRead { // treat this as a stream error (Section 5.4.2) of type // `PROTOCOL_ERROR`. return Err(Stream { - id: head.stream_id(), - reason: ProtocolError, - }); - } + id: head.stream_id(), + reason: ProtocolError, + }); + }, _ => return Err(Connection(ProtocolError)), }; if headers.is_end_headers() { // Load the HPACK encoded headers & return the frame match headers.load_hpack(payload, &mut self.hpack) { - Ok(_) => {} + Ok(_) => {}, Err(frame::Error::MalformedMessage) => { return Err(Stream { - id: head.stream_id(), - reason: ProtocolError, - }); - } + id: head.stream_id(), + reason: ProtocolError, + }); + }, Err(_) => return Err(Connection(ProtocolError)), } @@ -125,25 +125,25 @@ impl FramedRead { } else { // Defer loading the frame self.partial = Some(Partial { - frame: Continuable::Headers(headers), - buf: payload, - }); + frame: Continuable::Headers(headers), + buf: payload, + }); return Ok(None); } - } + }, Kind::Reset => { let res = frame::Reset::load(head, &bytes[frame::HEADER_LEN..]); res.map_err(|_| Connection(ProtocolError))?.into() - } + }, Kind::GoAway => { let res = frame::GoAway::load(&bytes[frame::HEADER_LEN..]); res.map_err(|_| Connection(ProtocolError))?.into() - } + }, Kind::PushPromise => { let res = frame::PushPromise::load(head, &bytes[frame::HEADER_LEN..]); res.map_err(|_| Connection(ProtocolError))?.into() - } + }, Kind::Priority => { if head.stream_id() == 0 { // Invalid stream identifier @@ -157,13 +157,13 @@ impl FramedRead { // treat this as a stream error (Section 5.4.2) of type // `PROTOCOL_ERROR`. return Err(Stream { - id: head.stream_id(), - reason: ProtocolError, - }); - } + id: head.stream_id(), + reason: ProtocolError, + }); + }, Err(_) => return Err(Connection(ProtocolError)), } - } + }, Kind::Continuation => { // TODO: Un-hack this let end_of_headers = (head.flag() & 0x4) == 0x4; @@ -189,24 +189,24 @@ impl FramedRead { } match frame.load_hpack(partial.buf, &mut self.hpack) { - Ok(_) => {} + Ok(_) => {}, Err(frame::Error::MalformedMessage) => { return Err(Stream { - id: head.stream_id(), - reason: ProtocolError, - }); - } + id: head.stream_id(), + reason: ProtocolError, + }); + }, Err(_) => return Err(Connection(ProtocolError)), } frame.into() - } + }, } - } + }, Kind::Unknown => { // Unknown frames are ignored return Ok(None); - } + }, }; Ok(Some(frame)) diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index 1d298c1..c4ed185 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -120,32 +120,32 @@ where // Save off the last frame... self.last_data_frame = Some(v); } - } + }, Frame::Headers(v) => { if let Some(continuation) = v.encode(&mut self.hpack, self.buf.get_mut()) { self.next = Some(Next::Continuation(continuation)); } - } + }, Frame::PushPromise(v) => { debug!("unimplemented PUSH_PROMISE write; frame={:?}", v); unimplemented!(); - } + }, Frame::Settings(v) => { v.encode(self.buf.get_mut()); trace!("encoded settings; rem={:?}", self.buf.remaining()); - } + }, Frame::GoAway(v) => { v.encode(self.buf.get_mut()); trace!("encoded go_away; rem={:?}", self.buf.remaining()); - } + }, Frame::Ping(v) => { v.encode(self.buf.get_mut()); trace!("encoded ping; rem={:?}", self.buf.remaining()); - } + }, Frame::WindowUpdate(v) => { v.encode(self.buf.get_mut()); trace!("encoded window_update; rem={:?}", self.buf.remaining()); - } + }, Frame::Priority(_) => { /* @@ -153,11 +153,11 @@ where trace!("encoded priority; rem={:?}", self.buf.remaining()); */ unimplemented!(); - } + }, Frame::Reset(v) => { v.encode(self.buf.get_mut()); trace!("encoded reset; rem={:?}", self.buf.remaining()); - } + }, } Ok(()) @@ -172,10 +172,10 @@ where Some(Next::Data(ref mut frame)) => { let mut buf = Buf::by_ref(&mut self.buf).chain(frame.payload_mut()); try_ready!(self.inner.write_buf(&mut buf)); - } + }, _ => { try_ready!(self.inner.write_buf(&mut self.buf)); - } + }, } } @@ -183,11 +183,11 @@ where match self.next.take() { Some(Next::Data(frame)) => { self.last_data_frame = Some(frame); - } + }, Some(Next::Continuation(_)) => { unimplemented!(); - } - None => {} + }, + None => {}, } trace!("flushing buffer"); diff --git a/src/frame/data.rs b/src/frame/data.rs index 71bdad7..9b76584 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -121,11 +121,11 @@ impl Data { }; Ok(Data { - stream_id: head.stream_id(), - data: payload, - flags: flags, - pad_len: pad_len, - }) + stream_id: head.stream_id(), + data: payload, + flags: flags, + pad_len: pad_len, + }) } } diff --git a/src/frame/go_away.rs b/src/frame/go_away.rs index b1dbd8c..4b7c4d2 100644 --- a/src/frame/go_away.rs +++ b/src/frame/go_away.rs @@ -35,9 +35,9 @@ impl GoAway { let error_code = unpack_octets_4!(payload, 4, u32); Ok(GoAway { - last_stream_id: last_stream_id, - error_code: error_code, - }) + last_stream_id: last_stream_id, + error_code: error_code, + }) } pub fn encode(&self, dst: &mut B) { diff --git a/src/frame/headers.rs b/src/frame/headers.rs index f6b460b..8d9b064 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -218,7 +218,7 @@ impl Headers { reg = true; self.fields.append(name, value); } - } + }, Authority(v) => set_pseudo!(authority, v), Method(v) => set_pseudo!(method, v), Scheme(v) => set_pseudo!(scheme, v), @@ -285,13 +285,11 @@ impl Headers { let ret = match encoder.encode(None, &mut headers, dst) { hpack::Encode::Full => None, - hpack::Encode::Partial(state) => { - Some(Continuation { - stream_id: self.stream_id, - hpack: state, - headers: headers, - }) - } + hpack::Encode::Partial(state) => Some(Continuation { + stream_id: self.stream_id, + hpack: state, + headers: headers, + }), }; // Compute the frame length @@ -336,10 +334,10 @@ impl PushPromise { let (promised_id, _) = StreamId::parse(&payload[..4]); Ok(PushPromise { - stream_id: head.stream_id(), - promised_id: promised_id, - flags: flags, - }) + stream_id: head.stream_id(), + promised_id: promised_id, + flags: flags, + }) } pub fn stream_id(&self) -> StreamId { diff --git a/src/frame/ping.rs b/src/frame/ping.rs index ba0d4c9..83ae18a 100644 --- a/src/frame/ping.rs +++ b/src/frame/ping.rs @@ -69,9 +69,9 @@ impl Ping { let ack = head.flag() & ACK_FLAG != 0; Ok(Ping { - ack, - payload, - }) + ack, + payload, + }) } pub fn encode(&self, dst: &mut B) { diff --git a/src/frame/priority.rs b/src/frame/priority.rs index 16d9494..d0f84c0 100644 --- a/src/frame/priority.rs +++ b/src/frame/priority.rs @@ -29,9 +29,9 @@ impl Priority { } Ok(Priority { - stream_id: head.stream_id(), - dependency: dependency, - }) + stream_id: head.stream_id(), + dependency: dependency, + }) } } diff --git a/src/frame/reason.rs b/src/frame/reason.rs index a2f6440..0c804c2 100644 --- a/src/frame/reason.rs +++ b/src/frame/reason.rs @@ -40,7 +40,7 @@ impl Reason { ConnectError => { "connection established in response to a CONNECT request \ was reset or abnormally closed" - } + }, EnhanceYourCalm => "detected excessive load generating behavior", InadequateSecurity => "security properties do not meet minimum requirements", Http11Required => "endpoint requires HTTP/1.1", diff --git a/src/frame/reset.rs b/src/frame/reset.rs index d82f401..2023023 100644 --- a/src/frame/reset.rs +++ b/src/frame/reset.rs @@ -32,15 +32,17 @@ impl Reset { let error_code = unpack_octets_4!(payload, 0, u32); Ok(Reset { - stream_id: head.stream_id(), - error_code: error_code, - }) + stream_id: head.stream_id(), + error_code: error_code, + }) } pub fn encode(&self, dst: &mut B) { - trace!("encoding RESET; id={:?} code={}", - self.stream_id, - self.error_code); + trace!( + "encoding RESET; id={:?} code={}", + self.stream_id, + self.error_code + ); let head = Head::new(Kind::Reset, 0, self.stream_id); head.encode(4, dst); dst.put_u32::(self.error_code); diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 81f9409..69821c8 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -109,38 +109,34 @@ impl Settings { match Setting::load(raw) { Some(HeaderTableSize(val)) => { settings.header_table_size = Some(val); - } - Some(EnablePush(val)) => { - match val { - 0 | 1 => { - settings.enable_push = Some(val); - } - _ => { - return Err(Error::InvalidSettingValue); - } - } - } + }, + Some(EnablePush(val)) => match val { + 0 | 1 => { + settings.enable_push = Some(val); + }, + _ => { + return Err(Error::InvalidSettingValue); + }, + }, Some(MaxConcurrentStreams(val)) => { settings.max_concurrent_streams = Some(val); - } - Some(InitialWindowSize(val)) => { - if val as usize > MAX_INITIAL_WINDOW_SIZE { - return Err(Error::InvalidSettingValue); - } else { - settings.initial_window_size = Some(val); - } - } + }, + Some(InitialWindowSize(val)) => if val as usize > MAX_INITIAL_WINDOW_SIZE { + return Err(Error::InvalidSettingValue); + } else { + settings.initial_window_size = Some(val); + }, Some(MaxFrameSize(val)) => { if val < DEFAULT_MAX_FRAME_SIZE || val as usize > MAX_MAX_FRAME_SIZE { return Err(Error::InvalidSettingValue); } else { settings.max_frame_size = Some(val); } - } + }, Some(MaxHeaderListSize(val)) => { settings.max_header_list_size = Some(val); - } - None => {} + }, + None => {}, } } diff --git a/src/frame/window_update.rs b/src/frame/window_update.rs index 6bfaed6..3488ac3 100644 --- a/src/frame/window_update.rs +++ b/src/frame/window_update.rs @@ -42,9 +42,9 @@ impl WindowUpdate { } Ok(WindowUpdate { - stream_id: head.stream_id(), - size_increment, - }) + stream_id: head.stream_id(), + size_increment, + }) } pub fn encode(&self, dst: &mut B) { diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index b8e08cc..23f4ea1 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -186,7 +186,7 @@ impl Decoder { trace!(" Indexed; rem={:?}", src.remaining()); can_resize = false; f(self.decode_indexed(src)?); - } + }, LiteralWithIndexing => { trace!(" LiteralWithIndexing; rem={:?}", src.remaining()); can_resize = false; @@ -196,13 +196,13 @@ impl Decoder { self.table.insert(entry.clone()); f(entry); - } + }, LiteralWithoutIndexing => { trace!(" LiteralWithoutIndexing; rem={:?}", src.remaining()); can_resize = false; let entry = self.decode_literal(src, false)?; f(entry); - } + }, LiteralNeverIndexed => { trace!(" LiteralNeverIndexed; rem={:?}", src.remaining()); can_resize = false; @@ -211,7 +211,7 @@ impl Decoder { // TODO: Track that this should never be indexed f(entry); - } + }, SizeUpdate => { trace!(" SizeUpdate; rem={:?}", src.remaining()); if !can_resize { @@ -220,7 +220,7 @@ impl Decoder { // Handle the dynamic table size update self.process_size_update(src)?; - } + }, } } @@ -234,9 +234,11 @@ impl Decoder { return Err(DecoderError::InvalidMaxDynamicSize); } - debug!("Decoder changed max table size from {} to {}", - self.table.size(), - new_size); + debug!( + "Decoder changed max table size from {} to {}", + self.table.size(), + new_size + ); self.table.set_max_size(new_size); @@ -287,9 +289,11 @@ impl Decoder { let len = decode_int(buf, 7)?; if len > buf.remaining() { - trace!("decode_string underflow; len={}; remaining={}", - len, - buf.remaining()); + trace!( + "decode_string underflow; len={}; remaining={}", + len, + buf.remaining() + ); return Err(DecoderError::StringUnderflow); } @@ -489,7 +493,7 @@ impl Table { // Can never happen as the size of the table must reach // 0 by the time we've exhausted all elements. panic!("Size of table != 0, but no headers left!"); - } + }, }; self.size -= last.len(); @@ -563,288 +567,194 @@ pub fn get_static(idx: usize) -> Header { 12 => Header::Status(StatusCode::BAD_REQUEST), 13 => Header::Status(StatusCode::NOT_FOUND), 14 => Header::Status(StatusCode::INTERNAL_SERVER_ERROR), - 15 => { - Header::Field { - name: header::ACCEPT_CHARSET, - value: HeaderValue::from_static(""), - } - } - 16 => { - Header::Field { - name: header::ACCEPT_ENCODING, - value: HeaderValue::from_static("gzip, deflate"), - } - } - 17 => { - Header::Field { - name: header::ACCEPT_LANGUAGE, - value: HeaderValue::from_static(""), - } - } - 18 => { - Header::Field { - name: header::ACCEPT_RANGES, - value: HeaderValue::from_static(""), - } - } - 19 => { - Header::Field { - name: header::ACCEPT, - value: HeaderValue::from_static(""), - } - } - 20 => { - Header::Field { - name: header::ACCESS_CONTROL_ALLOW_ORIGIN, - value: HeaderValue::from_static(""), - } - } - 21 => { - Header::Field { - name: header::AGE, - value: HeaderValue::from_static(""), - } - } - 22 => { - Header::Field { - name: header::ALLOW, - value: HeaderValue::from_static(""), - } - } - 23 => { - Header::Field { - name: header::AUTHORIZATION, - value: HeaderValue::from_static(""), - } - } - 24 => { - Header::Field { - name: header::CACHE_CONTROL, - value: HeaderValue::from_static(""), - } - } - 25 => { - Header::Field { - name: header::CONTENT_DISPOSITION, - value: HeaderValue::from_static(""), - } - } - 26 => { - Header::Field { - name: header::CONTENT_ENCODING, - value: HeaderValue::from_static(""), - } - } - 27 => { - Header::Field { - name: header::CONTENT_LANGUAGE, - value: HeaderValue::from_static(""), - } - } - 28 => { - Header::Field { - name: header::CONTENT_LENGTH, - value: HeaderValue::from_static(""), - } - } - 29 => { - Header::Field { - name: header::CONTENT_LOCATION, - value: HeaderValue::from_static(""), - } - } - 30 => { - Header::Field { - name: header::CONTENT_RANGE, - value: HeaderValue::from_static(""), - } - } - 31 => { - Header::Field { - name: header::CONTENT_TYPE, - value: HeaderValue::from_static(""), - } - } - 32 => { - Header::Field { - name: header::COOKIE, - value: HeaderValue::from_static(""), - } - } - 33 => { - Header::Field { - name: header::DATE, - value: HeaderValue::from_static(""), - } - } - 34 => { - Header::Field { - name: header::ETAG, - value: HeaderValue::from_static(""), - } - } - 35 => { - Header::Field { - name: header::EXPECT, - value: HeaderValue::from_static(""), - } - } - 36 => { - Header::Field { - name: header::EXPIRES, - value: HeaderValue::from_static(""), - } - } - 37 => { - Header::Field { - name: header::FROM, - value: HeaderValue::from_static(""), - } - } - 38 => { - Header::Field { - name: header::HOST, - value: HeaderValue::from_static(""), - } - } - 39 => { - Header::Field { - name: header::IF_MATCH, - value: HeaderValue::from_static(""), - } - } - 40 => { - Header::Field { - name: header::IF_MODIFIED_SINCE, - value: HeaderValue::from_static(""), - } - } - 41 => { - Header::Field { - name: header::IF_NONE_MATCH, - value: HeaderValue::from_static(""), - } - } - 42 => { - Header::Field { - name: header::IF_RANGE, - value: HeaderValue::from_static(""), - } - } - 43 => { - Header::Field { - name: header::IF_UNMODIFIED_SINCE, - value: HeaderValue::from_static(""), - } - } - 44 => { - Header::Field { - name: header::LAST_MODIFIED, - value: HeaderValue::from_static(""), - } - } - 45 => { - Header::Field { - name: header::LINK, - value: HeaderValue::from_static(""), - } - } - 46 => { - Header::Field { - name: header::LOCATION, - value: HeaderValue::from_static(""), - } - } - 47 => { - Header::Field { - name: header::MAX_FORWARDS, - value: HeaderValue::from_static(""), - } - } - 48 => { - Header::Field { - name: header::PROXY_AUTHENTICATE, - value: HeaderValue::from_static(""), - } - } - 49 => { - Header::Field { - name: header::PROXY_AUTHORIZATION, - value: HeaderValue::from_static(""), - } - } - 50 => { - Header::Field { - name: header::RANGE, - value: HeaderValue::from_static(""), - } - } - 51 => { - Header::Field { - name: header::REFERER, - value: HeaderValue::from_static(""), - } - } - 52 => { - Header::Field { - name: header::REFRESH, - value: HeaderValue::from_static(""), - } - } - 53 => { - Header::Field { - name: header::RETRY_AFTER, - value: HeaderValue::from_static(""), - } - } - 54 => { - Header::Field { - name: header::SERVER, - value: HeaderValue::from_static(""), - } - } - 55 => { - Header::Field { - name: header::SET_COOKIE, - value: HeaderValue::from_static(""), - } - } - 56 => { - Header::Field { - name: header::STRICT_TRANSPORT_SECURITY, - value: HeaderValue::from_static(""), - } - } - 57 => { - Header::Field { - name: header::TRANSFER_ENCODING, - value: HeaderValue::from_static(""), - } - } - 58 => { - Header::Field { - name: header::USER_AGENT, - value: HeaderValue::from_static(""), - } - } - 59 => { - Header::Field { - name: header::VARY, - value: HeaderValue::from_static(""), - } - } - 60 => { - Header::Field { - name: header::VIA, - value: HeaderValue::from_static(""), - } - } - 61 => { - Header::Field { - name: header::WWW_AUTHENTICATE, - value: HeaderValue::from_static(""), - } - } + 15 => Header::Field { + name: header::ACCEPT_CHARSET, + value: HeaderValue::from_static(""), + }, + 16 => Header::Field { + name: header::ACCEPT_ENCODING, + value: HeaderValue::from_static("gzip, deflate"), + }, + 17 => Header::Field { + name: header::ACCEPT_LANGUAGE, + value: HeaderValue::from_static(""), + }, + 18 => Header::Field { + name: header::ACCEPT_RANGES, + value: HeaderValue::from_static(""), + }, + 19 => Header::Field { + name: header::ACCEPT, + value: HeaderValue::from_static(""), + }, + 20 => Header::Field { + name: header::ACCESS_CONTROL_ALLOW_ORIGIN, + value: HeaderValue::from_static(""), + }, + 21 => Header::Field { + name: header::AGE, + value: HeaderValue::from_static(""), + }, + 22 => Header::Field { + name: header::ALLOW, + value: HeaderValue::from_static(""), + }, + 23 => Header::Field { + name: header::AUTHORIZATION, + value: HeaderValue::from_static(""), + }, + 24 => Header::Field { + name: header::CACHE_CONTROL, + value: HeaderValue::from_static(""), + }, + 25 => Header::Field { + name: header::CONTENT_DISPOSITION, + value: HeaderValue::from_static(""), + }, + 26 => Header::Field { + name: header::CONTENT_ENCODING, + value: HeaderValue::from_static(""), + }, + 27 => Header::Field { + name: header::CONTENT_LANGUAGE, + value: HeaderValue::from_static(""), + }, + 28 => Header::Field { + name: header::CONTENT_LENGTH, + value: HeaderValue::from_static(""), + }, + 29 => Header::Field { + name: header::CONTENT_LOCATION, + value: HeaderValue::from_static(""), + }, + 30 => Header::Field { + name: header::CONTENT_RANGE, + value: HeaderValue::from_static(""), + }, + 31 => Header::Field { + name: header::CONTENT_TYPE, + value: HeaderValue::from_static(""), + }, + 32 => Header::Field { + name: header::COOKIE, + value: HeaderValue::from_static(""), + }, + 33 => Header::Field { + name: header::DATE, + value: HeaderValue::from_static(""), + }, + 34 => Header::Field { + name: header::ETAG, + value: HeaderValue::from_static(""), + }, + 35 => Header::Field { + name: header::EXPECT, + value: HeaderValue::from_static(""), + }, + 36 => Header::Field { + name: header::EXPIRES, + value: HeaderValue::from_static(""), + }, + 37 => Header::Field { + name: header::FROM, + value: HeaderValue::from_static(""), + }, + 38 => Header::Field { + name: header::HOST, + value: HeaderValue::from_static(""), + }, + 39 => Header::Field { + name: header::IF_MATCH, + value: HeaderValue::from_static(""), + }, + 40 => Header::Field { + name: header::IF_MODIFIED_SINCE, + value: HeaderValue::from_static(""), + }, + 41 => Header::Field { + name: header::IF_NONE_MATCH, + value: HeaderValue::from_static(""), + }, + 42 => Header::Field { + name: header::IF_RANGE, + value: HeaderValue::from_static(""), + }, + 43 => Header::Field { + name: header::IF_UNMODIFIED_SINCE, + value: HeaderValue::from_static(""), + }, + 44 => Header::Field { + name: header::LAST_MODIFIED, + value: HeaderValue::from_static(""), + }, + 45 => Header::Field { + name: header::LINK, + value: HeaderValue::from_static(""), + }, + 46 => Header::Field { + name: header::LOCATION, + value: HeaderValue::from_static(""), + }, + 47 => Header::Field { + name: header::MAX_FORWARDS, + value: HeaderValue::from_static(""), + }, + 48 => Header::Field { + name: header::PROXY_AUTHENTICATE, + value: HeaderValue::from_static(""), + }, + 49 => Header::Field { + name: header::PROXY_AUTHORIZATION, + value: HeaderValue::from_static(""), + }, + 50 => Header::Field { + name: header::RANGE, + value: HeaderValue::from_static(""), + }, + 51 => Header::Field { + name: header::REFERER, + value: HeaderValue::from_static(""), + }, + 52 => Header::Field { + name: header::REFRESH, + value: HeaderValue::from_static(""), + }, + 53 => Header::Field { + name: header::RETRY_AFTER, + value: HeaderValue::from_static(""), + }, + 54 => Header::Field { + name: header::SERVER, + value: HeaderValue::from_static(""), + }, + 55 => Header::Field { + name: header::SET_COOKIE, + value: HeaderValue::from_static(""), + }, + 56 => Header::Field { + name: header::STRICT_TRANSPORT_SECURITY, + value: HeaderValue::from_static(""), + }, + 57 => Header::Field { + name: header::TRANSFER_ENCODING, + value: HeaderValue::from_static(""), + }, + 58 => Header::Field { + name: header::USER_AGENT, + value: HeaderValue::from_static(""), + }, + 59 => Header::Field { + name: header::VARY, + value: HeaderValue::from_static(""), + }, + 60 => Header::Field { + name: header::VIA, + value: HeaderValue::from_static(""), + }, + 61 => Header::Field { + name: header::WWW_AUTHENTICATE, + value: HeaderValue::from_static(""), + }, _ => unreachable!(), } } diff --git a/src/hpack/encoder.rs b/src/hpack/encoder.rs index 2818486..9c3a4c6 100644 --- a/src/hpack/encoder.rs +++ b/src/hpack/encoder.rs @@ -47,31 +47,27 @@ impl Encoder { #[allow(dead_code)] pub fn update_max_size(&mut self, val: usize) { match self.size_update { - Some(SizeUpdate::One(old)) => { - if val > old { - if old > self.table.max_size() { - self.size_update = Some(SizeUpdate::One(val)); - } else { - self.size_update = Some(SizeUpdate::Two(old, val)); - } - } else { - self.size_update = Some(SizeUpdate::One(val)); - } - } - Some(SizeUpdate::Two(min, _)) => { - if val < min { + Some(SizeUpdate::One(old)) => if val > old { + if old > self.table.max_size() { self.size_update = Some(SizeUpdate::One(val)); } else { - self.size_update = Some(SizeUpdate::Two(min, val)); + self.size_update = Some(SizeUpdate::Two(old, val)); } - } + } else { + self.size_update = Some(SizeUpdate::One(val)); + }, + Some(SizeUpdate::Two(min, _)) => if val < min { + self.size_update = Some(SizeUpdate::One(val)); + } else { + self.size_update = Some(SizeUpdate::Two(min, val)); + }, None => { if val != self.table.max_size() { // Don't bother writing a frame if the value already matches // the table's max size. self.size_update = Some(SizeUpdate::One(val)); } - } + }, } } @@ -124,13 +120,13 @@ impl Encoder { if res.is_err() { dst.truncate(len); return Encode::Partial(EncodeState { - index: index, - value: None, - }); + index: index, + value: None, + }); } last_index = Some(index); - } + }, // The header does not have an associated name. This means that // the name is the same as the previously yielded header. In // which case, we skip table lookup and just use the same index @@ -142,11 +138,11 @@ impl Encoder { if res.is_err() { dst.truncate(len); return Encode::Partial(EncodeState { - index: last_index.unwrap(), - value: Some(value), - }); + index: last_index.unwrap(), + value: Some(value), + }); } - } + }, }; } @@ -158,14 +154,14 @@ impl Encoder { Some(SizeUpdate::One(val)) => { self.table.resize(val); encode_size_update(val, dst)?; - } + }, Some(SizeUpdate::Two(min, max)) => { self.table.resize(min); self.table.resize(max); encode_size_update(min, dst)?; encode_size_update(max, dst)?; - } - None => {} + }, + None => {}, } Ok(()) @@ -175,12 +171,12 @@ impl Encoder { match *index { Index::Indexed(idx, _) => { encode_int(idx, 7, 0x80, dst)?; - } + }, Index::Name(idx, _) => { let header = self.table.resolve(&index); encode_not_indexed(idx, header.value_slice(), header.is_sensitive(), dst)?; - } + }, Index::Inserted(_) => { let header = self.table.resolve(&index); @@ -194,7 +190,7 @@ impl Encoder { encode_str(header.name().as_slice(), dst)?; encode_str(header.value_slice(), dst)?; - } + }, Index::InsertedValue(idx, _) => { let header = self.table.resolve(&index); @@ -202,15 +198,17 @@ impl Encoder { encode_int(idx, 6, 0b01000000, dst)?; encode_str(header.value_slice(), dst)?; - } + }, Index::NotIndexed(_) => { let header = self.table.resolve(&index); - encode_not_indexed2(header.name().as_slice(), - header.value_slice(), - header.is_sensitive(), - dst)?; - } + encode_not_indexed2( + header.name().as_slice(), + header.value_slice(), + header.is_sensitive(), + dst, + )?; + }, } Ok(()) @@ -230,15 +228,17 @@ impl Encoder { let idx = self.table.resolve_idx(last); encode_not_indexed(idx, value.as_ref(), value.is_sensitive(), dst)?; - } + }, Index::NotIndexed(_) => { let last = self.table.resolve(last); - encode_not_indexed2(last.name().as_slice(), - value.as_ref(), - value.is_sensitive(), - dst)?; - } + encode_not_indexed2( + last.name().as_slice(), + value.as_ref(), + value.is_sensitive(), + dst, + )?; + }, } Ok(()) @@ -570,8 +570,10 @@ mod test { // Using the name component of a previously indexed header (without // sensitive flag set) - let _ = encode(&mut encoder, - vec![self::header("my-password", "not-so-secret")]); + let _ = encode( + &mut encoder, + vec![self::header("my-password", "not-so-secret")], + ); let name = "my-password".parse().unwrap(); let mut value = HeaderValue::from_bytes(b"12345").unwrap(); @@ -794,7 +796,7 @@ mod test { dst.clear(); match encoder.encode(Some(resume), &mut input, &mut dst) { - Encode::Full => {} + Encode::Full => {}, _ => panic!(), } diff --git a/src/hpack/header.rs b/src/hpack/header.rs index 9ec9beb..b91701f 100644 --- a/src/hpack/header.rs +++ b/src/hpack/header.rs @@ -38,25 +38,23 @@ impl Header> { use self::Header::*; Ok(match self { - Field { - name: Some(n), - value, - } => { - Field { - name: n, - value: value, - } - } - Field { - name: None, - value, - } => return Err(value), - Authority(v) => Authority(v), - Method(v) => Method(v), - Scheme(v) => Scheme(v), - Path(v) => Path(v), - Status(v) => Status(v), - }) + Field { + name: Some(n), + value, + } => Field { + name: n, + value: value, + }, + Field { + name: None, + value, + } => return Err(value), + Authority(v) => Authority(v), + Method(v) => Method(v), + Scheme(v) => Scheme(v), + Path(v) => Path(v), + Status(v) => Status(v), + }) } } @@ -67,23 +65,23 @@ impl Header { b"authority" => { let value = String::try_from(value)?; Ok(Header::Authority(value)) - } + }, b"method" => { let method = Method::from_bytes(&value)?; Ok(Header::Method(method)) - } + }, b"scheme" => { let value = String::try_from(value)?; Ok(Header::Scheme(value)) - } + }, b"path" => { let value = String::try_from(value)?; Ok(Header::Path(value)) - } + }, b"status" => { let status = StatusCode::from_bytes(&value)?; Ok(Header::Status(status)) - } + }, _ => Err(DecoderError::InvalidPseudoheader), } } else { @@ -92,9 +90,9 @@ impl Header { let value = HeaderValue::from_bytes(&value)?; Ok(Header::Field { - name: name, - value: value, - }) + name: name, + value: value, + }) } } @@ -151,37 +149,27 @@ impl Header { } => a == value, _ => false, } - } - Header::Authority(ref a) => { - match *other { - Header::Authority(ref b) => a == b, - _ => false, - } - } - Header::Method(ref a) => { - match *other { - Header::Method(ref b) => a == b, - _ => false, - } - } - Header::Scheme(ref a) => { - match *other { - Header::Scheme(ref b) => a == b, - _ => false, - } - } - Header::Path(ref a) => { - match *other { - Header::Path(ref b) => a == b, - _ => false, - } - } - Header::Status(ref a) => { - match *other { - Header::Status(ref b) => a == b, - _ => false, - } - } + }, + Header::Authority(ref a) => match *other { + Header::Authority(ref b) => a == b, + _ => false, + }, + Header::Method(ref a) => match *other { + Header::Method(ref b) => a == b, + _ => false, + }, + Header::Scheme(ref a) => match *other { + Header::Scheme(ref b) => a == b, + _ => false, + }, + Header::Path(ref a) => match *other { + Header::Path(ref b) => a == b, + _ => false, + }, + Header::Status(ref a) => match *other { + Header::Status(ref b) => a == b, + _ => false, + }, } } @@ -201,20 +189,18 @@ impl Header { match *self { Header::Field { ref name, .. - } => { - match *name { - header::AGE | - header::AUTHORIZATION | - header::CONTENT_LENGTH | - header::ETAG | - header::IF_MODIFIED_SINCE | - header::IF_NONE_MATCH | - header::LOCATION | - header::COOKIE | - header::SET_COOKIE => true, - _ => false, - } - } + } => match *name { + header::AGE | + header::AUTHORIZATION | + header::CONTENT_LENGTH | + header::ETAG | + header::IF_MODIFIED_SINCE | + header::IF_NONE_MATCH | + header::LOCATION | + header::COOKIE | + header::SET_COOKIE => true, + _ => false, + }, Header::Path(..) => true, _ => false, } @@ -228,12 +214,10 @@ impl From
for Header> { Header::Field { name, value, - } => { - Header::Field { - name: Some(name), - value, - } - } + } => Header::Field { + name: Some(name), + value, + }, Header::Authority(v) => Header::Authority(v), Header::Method(v) => Header::Method(v), Header::Scheme(v) => Header::Scheme(v), @@ -246,12 +230,10 @@ impl From
for Header> { impl<'a> Name<'a> { pub fn into_entry(self, value: Bytes) -> Result { match self { - Name::Field(name) => { - Ok(Header::Field { - name: name.clone(), - value: HeaderValue::from_bytes(&*value)?, - }) - } + Name::Field(name) => Ok(Header::Field { + name: name.clone(), + value: HeaderValue::from_bytes(&*value)?, + }), Name::Authority => Ok(Header::Authority(String::try_from(value)?)), Name::Method => Ok(Header::Method(Method::from_bytes(&*value)?)), Name::Scheme => Ok(Header::Scheme(String::try_from(value)?)), @@ -262,7 +244,7 @@ impl<'a> Name<'a> { // TODO: better error handling Err(_) => Err(DecoderError::InvalidStatusCode), } - } + }, } } diff --git a/src/hpack/table.rs b/src/hpack/table.rs index 571d579..9fe446b 100644 --- a/src/hpack/table.rs +++ b/src/hpack/table.rs @@ -292,11 +292,13 @@ impl Table { let pos_idx = 0usize.wrapping_sub(self.inserted); - let prev = mem::replace(&mut self.indices[probe], - Some(Pos { - index: pos_idx, - hash: hash, - })); + let prev = mem::replace( + &mut self.indices[probe], + Some(Pos { + index: pos_idx, + hash: hash, + }), + ); if let Some(mut prev) = prev { // Shift forward @@ -325,10 +327,10 @@ impl Table { self.inserted = self.inserted.wrapping_add(1); self.slots.push_front(Slot { - hash: hash, - header: header, - next: None, - }); + hash: hash, + header: header, + next: None, + }); } pub fn resize(&mut self, size: usize) { @@ -377,12 +379,14 @@ impl Table { // Update the size self.size -= slot.header.len(); - debug_assert_eq!(self.indices - .iter() - .filter_map(|p| *p) - .filter(|p| p.index == pos_idx) - .count(), - 1); + debug_assert_eq!( + self.indices + .iter() + .filter_map(|p| *p) + .filter(|p| p.index == pos_idx) + .count(), + 1 + ); // Find the associated position probe_loop!(probe < self.indices.len(), { @@ -495,12 +499,12 @@ impl Table { } debug_assert!({ - let them = self.indices[probe].unwrap(); - let their_distance = probe_distance(self.mask, them.hash, probe); - let our_distance = probe_distance(self.mask, pos.hash, probe); + let them = self.indices[probe].unwrap(); + let their_distance = probe_distance(self.mask, them.hash, probe); + let our_distance = probe_distance(self.mask, pos.hash, probe); - their_distance >= our_distance - }); + their_distance >= our_distance + }); }); } } @@ -661,97 +665,85 @@ fn index_static(header: &Header) -> Option<(usize, bool)> { Header::Field { ref name, ref value, - } => { - match *name { - header::ACCEPT_CHARSET => Some((15, false)), - header::ACCEPT_ENCODING => { - if value == "gzip, deflate" { - Some((16, true)) - } else { - Some((16, false)) - } - } - header::ACCEPT_LANGUAGE => Some((17, false)), - header::ACCEPT_RANGES => Some((18, false)), - header::ACCEPT => Some((19, false)), - header::ACCESS_CONTROL_ALLOW_ORIGIN => Some((20, false)), - header::AGE => Some((21, false)), - header::ALLOW => Some((22, false)), - header::AUTHORIZATION => Some((23, false)), - header::CACHE_CONTROL => Some((24, false)), - header::CONTENT_DISPOSITION => Some((25, false)), - header::CONTENT_ENCODING => Some((26, false)), - header::CONTENT_LANGUAGE => Some((27, false)), - header::CONTENT_LENGTH => Some((28, false)), - header::CONTENT_LOCATION => Some((29, false)), - header::CONTENT_RANGE => Some((30, false)), - header::CONTENT_TYPE => Some((31, false)), - header::COOKIE => Some((32, false)), - header::DATE => Some((33, false)), - header::ETAG => Some((34, false)), - header::EXPECT => Some((35, false)), - header::EXPIRES => Some((36, false)), - header::FROM => Some((37, false)), - header::HOST => Some((38, false)), - header::IF_MATCH => Some((39, false)), - header::IF_MODIFIED_SINCE => Some((40, false)), - header::IF_NONE_MATCH => Some((41, false)), - header::IF_RANGE => Some((42, false)), - header::IF_UNMODIFIED_SINCE => Some((43, false)), - header::LAST_MODIFIED => Some((44, false)), - header::LINK => Some((45, false)), - header::LOCATION => Some((46, false)), - header::MAX_FORWARDS => Some((47, false)), - header::PROXY_AUTHENTICATE => Some((48, false)), - header::PROXY_AUTHORIZATION => Some((49, false)), - header::RANGE => Some((50, false)), - header::REFERER => Some((51, false)), - header::REFRESH => Some((52, false)), - header::RETRY_AFTER => Some((53, false)), - header::SERVER => Some((54, false)), - header::SET_COOKIE => Some((55, false)), - header::STRICT_TRANSPORT_SECURITY => Some((56, false)), - header::TRANSFER_ENCODING => Some((57, false)), - header::USER_AGENT => Some((58, false)), - header::VARY => Some((59, false)), - header::VIA => Some((60, false)), - header::WWW_AUTHENTICATE => Some((61, false)), - _ => None, - } - } + } => match *name { + header::ACCEPT_CHARSET => Some((15, false)), + header::ACCEPT_ENCODING => if value == "gzip, deflate" { + Some((16, true)) + } else { + Some((16, false)) + }, + header::ACCEPT_LANGUAGE => Some((17, false)), + header::ACCEPT_RANGES => Some((18, false)), + header::ACCEPT => Some((19, false)), + header::ACCESS_CONTROL_ALLOW_ORIGIN => Some((20, false)), + header::AGE => Some((21, false)), + header::ALLOW => Some((22, false)), + header::AUTHORIZATION => Some((23, false)), + header::CACHE_CONTROL => Some((24, false)), + header::CONTENT_DISPOSITION => Some((25, false)), + header::CONTENT_ENCODING => Some((26, false)), + header::CONTENT_LANGUAGE => Some((27, false)), + header::CONTENT_LENGTH => Some((28, false)), + header::CONTENT_LOCATION => Some((29, false)), + header::CONTENT_RANGE => Some((30, false)), + header::CONTENT_TYPE => Some((31, false)), + header::COOKIE => Some((32, false)), + header::DATE => Some((33, false)), + header::ETAG => Some((34, false)), + header::EXPECT => Some((35, false)), + header::EXPIRES => Some((36, false)), + header::FROM => Some((37, false)), + header::HOST => Some((38, false)), + header::IF_MATCH => Some((39, false)), + header::IF_MODIFIED_SINCE => Some((40, false)), + header::IF_NONE_MATCH => Some((41, false)), + header::IF_RANGE => Some((42, false)), + header::IF_UNMODIFIED_SINCE => Some((43, false)), + header::LAST_MODIFIED => Some((44, false)), + header::LINK => Some((45, false)), + header::LOCATION => Some((46, false)), + header::MAX_FORWARDS => Some((47, false)), + header::PROXY_AUTHENTICATE => Some((48, false)), + header::PROXY_AUTHORIZATION => Some((49, false)), + header::RANGE => Some((50, false)), + header::REFERER => Some((51, false)), + header::REFRESH => Some((52, false)), + header::RETRY_AFTER => Some((53, false)), + header::SERVER => Some((54, false)), + header::SET_COOKIE => Some((55, false)), + header::STRICT_TRANSPORT_SECURITY => Some((56, false)), + header::TRANSFER_ENCODING => Some((57, false)), + header::USER_AGENT => Some((58, false)), + header::VARY => Some((59, false)), + header::VIA => Some((60, false)), + header::WWW_AUTHENTICATE => Some((61, false)), + _ => None, + }, Header::Authority(_) => Some((1, false)), - Header::Method(ref v) => { - match *v { - Method::GET => Some((2, true)), - Method::POST => Some((3, true)), - _ => Some((2, false)), - } - } - Header::Scheme(ref v) => { - match &**v { - "http" => Some((6, true)), - "https" => Some((7, true)), - _ => Some((6, false)), - } - } - Header::Path(ref v) => { - match &**v { - "/" => Some((4, true)), - "/index.html" => Some((5, true)), - _ => Some((4, false)), - } - } - Header::Status(ref v) => { - match u16::from(*v) { - 200 => Some((8, true)), - 204 => Some((9, true)), - 206 => Some((10, true)), - 304 => Some((11, true)), - 400 => Some((12, true)), - 404 => Some((13, true)), - 500 => Some((14, true)), - _ => Some((8, false)), - } - } + Header::Method(ref v) => match *v { + Method::GET => Some((2, true)), + Method::POST => Some((3, true)), + _ => Some((2, false)), + }, + Header::Scheme(ref v) => match &**v { + "http" => Some((6, true)), + "https" => Some((7, true)), + _ => Some((6, false)), + }, + Header::Path(ref v) => match &**v { + "/" => Some((4, true)), + "/index.html" => Some((5, true)), + _ => Some((4, false)), + }, + Header::Status(ref v) => match u16::from(*v) { + 200 => Some((8, true)), + 204 => Some((9, true)), + 206 => Some((10, true)), + 304 => Some((11, true)), + 400 => Some((12, true)), + 404 => Some((13, true)), + 500 => Some((14, true)), + _ => Some((8, false)), + }, } } diff --git a/src/hpack/test/fuzz.rs b/src/hpack/test/fuzz.rs index 884e9b7..7b19aec 100644 --- a/src/hpack/test/fuzz.rs +++ b/src/hpack/test/fuzz.rs @@ -81,11 +81,11 @@ impl FuzzHpack { let low = rng.gen_range(0, high); frame.resizes.extend(&[low, high]); - } + }, 1...3 => { frame.resizes.push(rng.gen_range(128, MAX_CHUNK * 2)); - } - _ => {} + }, + _ => {}, } for _ in 0..rng.gen_range(1, (num - added) + 1) { @@ -155,7 +155,7 @@ impl FuzzHpack { .unwrap(); buf = BytesMut::with_capacity(chunks.pop().unwrap_or(MAX_CHUNK)); - } + }, } } @@ -185,7 +185,7 @@ fn gen_header(g: &mut StdRng) -> Header> { 0 => { let value = gen_string(g, 4, 20); Header::Authority(to_shared(value)) - } + }, 1 => { let method = match g.next_u32() % 6 { 0 => Method::GET, @@ -200,12 +200,12 @@ fn gen_header(g: &mut StdRng) -> Header> { .collect(); Method::from_bytes(&bytes).unwrap() - } + }, _ => unreachable!(), }; Header::Method(method) - } + }, 2 => { let value = match g.next_u32() % 2 { 0 => "http", @@ -214,7 +214,7 @@ fn gen_header(g: &mut StdRng) -> Header> { }; Header::Scheme(to_shared(value.to_string())) - } + }, 3 => { let value = match g.next_u32() % 100 { 0 => "/".to_string(), @@ -223,12 +223,12 @@ fn gen_header(g: &mut StdRng) -> Header> { }; Header::Path(to_shared(value)) - } + }, 4 => { let status = (g.gen::() % 500) + 100; Header::Status(StatusCode::from_u16(status).unwrap()) - } + }, _ => unreachable!(), } } else { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index e17030d..9c297b2 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -61,11 +61,11 @@ where pub fn new(codec: Codec>) -> Connection { // TODO: Actually configure let streams = Streams::new(streams::Config { - max_remote_initiated: None, - init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, - max_local_initiated: None, - init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, - }); + max_remote_initiated: None, + init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, + max_local_initiated: None, + init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, + }); Connection { state: State::Open, @@ -85,8 +85,10 @@ where // The order of these calls don't really matter too much as only one // should have pending work. try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); - try_ready!(self.settings - .send_pending_ack(&mut self.codec, &mut self.streams)); + try_ready!( + self.settings + .send_pending_ack(&mut self.codec, &mut self.streams) + ); try_ready!(self.streams.send_pending_refusal(&mut self.codec)); Ok(().into()) @@ -112,7 +114,7 @@ where try_ready!(self.streams.poll_complete(&mut self.codec)); return Ok(Async::NotReady); - } + }, // Attempting to read a frame resulted in a connection level // error. This is handled by setting a GO_AWAY frame followed by // terminating the connection. @@ -127,17 +129,17 @@ where // Transition to the going away state. self.state = State::GoAway(frame); - } + }, // Attempting to read a frame resulted in a stream level error. // This is handled by resetting the frame then trying to read // another frame. Err(Stream { - id, - reason, - }) => { + id, + reason, + }) => { trace!("stream level error; id={:?}; reason={:?}", id, reason); self.streams.send_reset(id, reason); - } + }, // Attempting to read a frame resulted in an I/O error. All // active streams must be reset. // @@ -150,9 +152,9 @@ where // Return the error return Err(e); - } + }, } - } + }, State::GoAway(frame) => { // Ensure the codec is ready to accept the frame try_ready!(self.codec.poll_ready()); @@ -165,17 +167,17 @@ where // GO_AWAY sent, transition the connection to an errored state self.state = State::Flush(frame.reason()); - } + }, State::Flush(reason) => { // Flush the codec try_ready!(self.codec.flush()); // Transition the state to error self.state = State::Error(reason); - } + }, State::Error(reason) => { return Err(reason.into()); - } + }, } } } @@ -191,46 +193,46 @@ where Some(Headers(frame)) => { trace!("recv HEADERS; frame={:?}", frame); self.streams.recv_headers(frame)?; - } + }, Some(Data(frame)) => { trace!("recv DATA; frame={:?}", frame); self.streams.recv_data(frame)?; - } + }, Some(Reset(frame)) => { trace!("recv RST_STREAM; frame={:?}", frame); self.streams.recv_reset(frame)?; - } + }, Some(PushPromise(frame)) => { trace!("recv PUSH_PROMISE; frame={:?}", frame); self.streams.recv_push_promise(frame)?; - } + }, Some(Settings(frame)) => { trace!("recv SETTINGS; frame={:?}", frame); self.settings.recv_settings(frame); - } + }, Some(GoAway(_)) => { // TODO: handle the last_processed_id. Also, should this be // handled as an error? // let _ = RecvError::Proto(frame.reason()); return Ok(().into()); - } + }, Some(Ping(frame)) => { trace!("recv PING; frame={:?}", frame); self.ping_pong.recv_ping(frame); - } + }, Some(WindowUpdate(frame)) => { trace!("recv WINDOW_UPDATE; frame={:?}", frame); self.streams.recv_window_update(frame)?; - } + }, Some(Priority(frame)) => { trace!("recv PRIORITY; frame={:?}", frame); // TODO: handle - } + }, None => { // TODO: Is this correct? trace!("codec closed"); return Ok(Async::Ready(())); - } + }, } } } diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 6feb070..8b31a4d 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -50,41 +50,41 @@ impl Deque { pub fn push_back(&mut self, buf: &mut Buffer, value: T) { let key = buf.slab.insert(Slot { - value, - next: None, - }); + value, + next: None, + }); match self.indices { Some(ref mut idxs) => { buf.slab[idxs.tail].next = Some(key); idxs.tail = key; - } + }, None => { self.indices = Some(Indices { - head: key, - tail: key, - }); - } + head: key, + tail: key, + }); + }, } } pub fn push_front(&mut self, buf: &mut Buffer, value: T) { let key = buf.slab.insert(Slot { - value, - next: None, - }); + value, + next: None, + }); match self.indices { Some(ref mut idxs) => { buf.slab[key].next = Some(idxs.head); idxs.head = key; - } + }, None => { self.indices = Some(Indices { - head: key, - tail: key, - }); - } + head: key, + tail: key, + }); + }, } } @@ -102,7 +102,7 @@ impl Deque { } return Some(slot.value); - } + }, None => None, } } diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index 08f3900..a857f3d 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -110,10 +110,12 @@ impl FlowControl { return Err(Reason::FlowControlError); } - trace!("inc_window; sz={}; old={}; new={}", - sz, - self.window_size, - val); + trace!( + "inc_window; sz={}; old={}; new={}", + sz, + self.window_size, + val + ); self.window_size = val; Ok(()) @@ -131,10 +133,12 @@ impl FlowControl { /// Decrements the window reflecting data has actually been sent. The caller /// must ensure that the window has capacity. pub fn send_data(&mut self, sz: WindowSize) { - trace!("send_data; sz={}; window={}; available={}", - sz, - self.window_size, - self.available); + trace!( + "send_data; sz={}; window={}; available={}", + sz, + self.window_size, + self.available + ); // Ensure that the argument is correct assert!(sz <= self.window_size as WindowSize); diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 2cb1a3a..1488555 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -111,10 +111,12 @@ where // Update the buffered data counter stream.buffered_send_data += sz; - trace!("send_data; sz={}; buffered={}; requested={}", - sz, - stream.buffered_send_data, - stream.requested_send_capacity); + trace!( + "send_data; sz={}; buffered={}; requested={}", + sz, + stream.buffered_send_data, + stream.requested_send_capacity + ); // Implicitly request more send capacity if not enough has been // requested yet. @@ -130,9 +132,11 @@ where self.reserve_capacity(0, stream); } - trace!("send_data (2); available={}; buffered={}", - stream.send_flow.available(), - stream.buffered_send_data); + trace!( + "send_data (2); available={}; buffered={}", + stream.send_flow.available(), + stream.buffered_send_data + ); if stream.send_flow.available() >= stream.buffered_send_data { // The stream currently has capacity to send the data frame, so @@ -152,11 +156,13 @@ where /// Request capacity to send data pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { - trace!("reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}", - stream.id, - capacity, - capacity + stream.buffered_send_data, - stream.requested_send_capacity); + trace!( + "reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}", + stream.id, + capacity, + capacity + stream.buffered_send_data, + stream.requested_send_capacity + ); // Actual capacity is `capacity` + the current amount of buffered data. // It it were less, then we could never send out the buffered data. @@ -196,11 +202,13 @@ where inc: WindowSize, stream: &mut store::Ptr, ) -> Result<(), Reason> { - trace!("recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}", - stream.id, - stream.state, - inc, - stream.send_flow); + trace!( + "recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}", + stream.id, + stream.state, + inc, + stream.send_flow + ); // Update the stream level flow control. stream.send_flow.inc_window(inc)?; @@ -254,16 +262,20 @@ where // The amount of additional capacity that the stream requests. // Don't assign more than the window has available! - let additional = cmp::min(total_requested - stream.send_flow.available(), - // Can't assign more than what is available - stream.send_flow.window_size() - stream.send_flow.available()); + let additional = cmp::min( + total_requested - stream.send_flow.available(), + // Can't assign more than what is available + stream.send_flow.window_size() - stream.send_flow.available(), + ); - trace!("try_assign_capacity; requested={}; additional={}; buffered={}; window={}; conn={}", - total_requested, - additional, - stream.buffered_send_data, - stream.send_flow.window_size(), - self.flow.available()); + trace!( + "try_assign_capacity; requested={}; additional={}; buffered={}; window={}; conn={}", + total_requested, + additional, + stream.buffered_send_data, + stream.send_flow.window_size(), + self.flow.available() + ); if additional == 0 { // Nothing more to do @@ -273,9 +285,11 @@ where // If the stream has requested capacity, then it must be in the // streaming state (more data could be sent) or there is buffered data // waiting to be sent. - debug_assert!(stream.state.is_send_streaming() || stream.buffered_send_data > 0, - "state={:?}", - stream.state); + debug_assert!( + stream.state.is_send_streaming() || stream.buffered_send_data > 0, + "state={:?}", + stream.state + ); // The amount of currently available capacity on the connection let conn_available = self.flow.available(); @@ -296,12 +310,13 @@ where self.flow.claim_capacity(assign); } - trace!("try_assign_capacity; available={}; requested={}; buffered={}; \ - has_unavailable={:?}", - stream.send_flow.available(), - stream.requested_send_capacity, - stream.buffered_send_data, - stream.send_flow.has_unavailable()); + trace!( + "try_assign_capacity; available={}; requested={}; buffered={}; has_unavailable={:?}", + stream.send_flow.available(), + stream.requested_send_capacity, + stream.buffered_send_data, + stream.send_flow.has_unavailable() + ); if stream.send_flow.available() < stream.requested_send_capacity { if stream.send_flow.has_unavailable() { @@ -367,7 +382,7 @@ where // Because, always try to reclaim... self.reclaim_frame(store, dst); - } + }, None => { // Try to flush the codec. try_ready!(dst.flush()); @@ -379,7 +394,7 @@ where // No need to poll ready as poll_complete() does this for // us... - } + }, } } } @@ -400,9 +415,11 @@ where // First check if there are any data chunks to take back if let Some(frame) = dst.take_last_data_frame() { - trace!(" -> reclaimed; frame={:?}; sz={}", - frame, - frame.payload().remaining()); + trace!( + " -> reclaimed; frame={:?}; sz={}", + frame, + frame.payload().remaining() + ); let mut eos = false; let key = frame.payload().stream; @@ -474,20 +491,24 @@ where let stream_capacity = stream.send_flow.available(); let sz = frame.payload().remaining(); - trace!(" --> data frame; stream={:?}; sz={}; eos={:?}; \ - window={}; available={}; requested={}", - frame.stream_id(), - sz, - frame.is_end_stream(), - stream_capacity, - stream.send_flow.available(), - stream.requested_send_capacity); + trace!( + " --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \ + available={}; requested={}", + frame.stream_id(), + sz, + frame.is_end_stream(), + stream_capacity, + stream.send_flow.available(), + stream.requested_send_capacity + ); // Zero length data frames always have capacity to // be sent. if sz > 0 && stream_capacity == 0 { - trace!(" --> stream capacity is 0; requested={}", - stream.requested_send_capacity); + trace!( + " --> stream capacity is 0; requested={}", + stream.requested_send_capacity + ); // Ensure that the stream is waiting for // connection level capacity @@ -551,7 +572,7 @@ where stream: stream.key(), } })) - } + }, frame => frame.map(|_| unreachable!()), }; @@ -568,7 +589,7 @@ where counts.transition_after(stream, is_counted); return Some(frame); - } + }, None => return None, } } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index c6a891e..e0f041c 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -150,7 +150,7 @@ where Ok(v) => v, Err(_) => { unimplemented!(); - } + }, }; stream.content_length = ContentLength::Remaining(content_length); @@ -185,9 +185,9 @@ where if stream.ensure_content_length_zero().is_err() { return Err(RecvError::Stream { - id: stream.id, - reason: ProtocolError, - }); + id: stream.id, + reason: ProtocolError, + }); } let trailers = frame.into_fields(); @@ -264,10 +264,12 @@ where return Err(RecvError::Connection(ProtocolError)); } - trace!("recv_data; size={}; connection={}; stream={}", - sz, - self.flow.window_size(), - stream.recv_flow.window_size()); + trace!( + "recv_data; size={}; connection={}; stream={}", + sz, + self.flow.window_size(), + stream.recv_flow.window_size() + ); // Ensure that there is enough capacity on the connection before acting // on the stream. @@ -286,17 +288,17 @@ where if stream.dec_content_length(frame.payload().len()).is_err() { return Err(RecvError::Stream { - id: stream.id, - reason: ProtocolError, - }); + id: stream.id, + reason: ProtocolError, + }); } if frame.is_end_stream() { if stream.ensure_content_length_zero().is_err() { return Err(RecvError::Stream { - id: stream.id, - reason: ProtocolError, - }); + id: stream.id, + reason: ProtocolError, + }); } if stream.state.recv_close().is_err() { @@ -343,9 +345,11 @@ where // TODO: All earlier stream IDs should be implicitly closed. // Now, create a new entry for the stream - let mut new_stream = Stream::new(frame.promised_id(), - send.init_window_sz(), - self.init_window_sz); + let mut new_stream = Stream::new( + frame.promised_id(), + send.init_window_sz(), + self.init_window_sz, + ); new_stream.state.reserve_remote()?; @@ -559,7 +563,7 @@ where // No more data frames Ok(None.into()) - } + }, None => { if stream.state.is_recv_closed() { // No more data frames will be received @@ -569,7 +573,7 @@ where stream.recv_task = Some(task::current()); Ok(Async::NotReady) } - } + }, } } @@ -584,7 +588,7 @@ where // the entire set of data frames have been consumed. What should // we do? unimplemented!(); - } + }, None => { if stream.state.is_recv_closed() { // There will be no trailer frame @@ -594,7 +598,7 @@ where stream.recv_task = Some(task::current()); Ok(Async::NotReady) } - } + }, } } } @@ -630,7 +634,7 @@ where stream.recv_task = Some(task::current()); Ok(Async::NotReady) - } + }, } } } diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index ada5553..5901fa7 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -70,9 +70,11 @@ where stream: &mut store::Ptr, task: &mut Option, ) -> Result<(), UserError> { - trace!("send_headers; frame={:?}; init_window={:?}", - frame, - self.init_window_sz); + trace!( + "send_headers; frame={:?}; init_window={:?}", + frame, + self.init_window_sz + ); let end_stream = frame.is_end_stream(); @@ -261,10 +263,12 @@ where let stream = &mut *stream; stream.send_flow.dec_window(dec); - trace!("decremented stream window; id={:?}; decr={}; flow={:?}", - stream.id, - dec, - stream.send_flow); + trace!( + "decremented stream window; id={:?}; decr={}; flow={:?}", + stream.id, + dec, + stream.send_flow + ); // TODO: Probably try to assign capacity? diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 6bd8009..200e3b0 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -83,40 +83,34 @@ impl State { let local = Peer::Streaming; self.inner = match self.inner { - Idle => { - if eos { - HalfClosedLocal(AwaitingHeaders) - } else { - Open { - local, - remote: AwaitingHeaders, - } + Idle => if eos { + HalfClosedLocal(AwaitingHeaders) + } else { + Open { + local, + remote: AwaitingHeaders, } - } + }, Open { local: AwaitingHeaders, remote, - } => { - if eos { - HalfClosedLocal(remote) - } else { - Open { - local, - remote, - } + } => if eos { + HalfClosedLocal(remote) + } else { + Open { + local, + remote, } - } - HalfClosedRemote(AwaitingHeaders) => { - if eos { - Closed(None) - } else { - HalfClosedRemote(local) - } - } + }, + HalfClosedRemote(AwaitingHeaders) => if eos { + Closed(None) + } else { + HalfClosedRemote(local) + }, _ => { // All other transitions result in a protocol error return Err(UnexpectedFrameType); - } + }, }; return Ok(()); @@ -142,7 +136,7 @@ impl State { remote, } } - } + }, ReservedRemote => { initial = true; @@ -154,31 +148,27 @@ impl State { remote, } } - } + }, Open { local, remote: AwaitingHeaders, - } => { - if eos { - HalfClosedRemote(local) - } else { - Open { - local, - remote, - } + } => if eos { + HalfClosedRemote(local) + } else { + Open { + local, + remote, } - } - HalfClosedLocal(AwaitingHeaders) => { - if eos { - Closed(None) - } else { - HalfClosedLocal(remote) - } - } + }, + HalfClosedLocal(AwaitingHeaders) => if eos { + Closed(None) + } else { + HalfClosedLocal(remote) + }, _ => { // All other transitions result in a protocol error return Err(RecvError::Connection(ProtocolError)); - } + }, }; return Ok(initial); @@ -190,7 +180,7 @@ impl State { Idle => { self.inner = ReservedRemote; Ok(()) - } + }, _ => Err(RecvError::Connection(ProtocolError)), } } @@ -205,12 +195,12 @@ impl State { trace!("recv_close: Open => HalfClosedRemote({:?})", local); self.inner = HalfClosedRemote(local); Ok(()) - } + }, HalfClosedLocal(..) => { trace!("recv_close: HalfClosedLocal => Closed"); self.inner = Closed(None); Ok(()) - } + }, _ => Err(RecvError::Connection(ProtocolError)), } } @@ -219,14 +209,14 @@ impl State { use proto::Error::*; match self.inner { - Closed(..) => {} + Closed(..) => {}, _ => { trace!("recv_err; err={:?}", err); self.inner = Closed(match *err { - Proto(reason) => Some(Cause::Proto(reason)), - Io(..) => Some(Cause::Io), - }); - } + Proto(reason) => Some(Cause::Proto(reason)), + Io(..) => Some(Cause::Io), + }); + }, } } @@ -239,11 +229,11 @@ impl State { // The remote side will continue to receive data. trace!("send_close: Open => HalfClosedLocal({:?})", remote); self.inner = HalfClosedLocal(remote); - } + }, HalfClosedRemote(..) => { trace!("send_close: HalfClosedRemote => Closed"); self.inner = Closed(None); - } + }, _ => panic!("transition send_close on unexpected state"), } } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index a90b199..75a3bdf 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -106,9 +106,9 @@ where }; Some(Ptr { - key: Key(key), - store: self, - }) + key: Key(key), + store: self, + }) } pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr { @@ -125,17 +125,13 @@ where use self::ordermap::Entry::*; match self.ids.entry(id) { - Occupied(e) => { - Entry::Occupied(OccupiedEntry { - ids: e, - }) - } - Vacant(e) => { - Entry::Vacant(VacantEntry { - ids: e, - slab: &mut self.slab, - }) - } + Occupied(e) => Entry::Occupied(OccupiedEntry { + ids: e, + }), + Vacant(e) => Entry::Vacant(VacantEntry { + ids: e, + slab: &mut self.slab, + }), } } @@ -151,9 +147,9 @@ where let key = *self.ids.get_index(i).unwrap().1; f(Ptr { - key: Key(key), - store: self, - })?; + key: Key(key), + store: self, + })?; // TODO: This logic probably could be better... let new_len = self.ids.len(); @@ -268,14 +264,14 @@ where // Update the tail pointer idxs.tail = stream.key(); - } + }, None => { trace!(" -> first entry"); self.indices = Some(store::Indices { - head: stream.key(), - tail: stream.key(), - }); - } + head: stream.key(), + tail: stream.key(), + }); + }, } true diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index df80168..8274bcb 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -217,14 +217,12 @@ where /// Returns `Err` when the decrement cannot be completed due to overflow. pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { match self.content_length { - ContentLength::Remaining(ref mut rem) => { - match rem.checked_sub(len as u64) { - Some(val) => *rem = val, - None => return Err(()), - } - } + ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { + Some(val) => *rem = val, + None => return Err(()), + }, ContentLength::Head => return Err(()), - _ => {} + _ => {}, } Ok(()) diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 82a1c20..717de81 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -66,14 +66,14 @@ where pub fn new(config: Config) -> Self { Streams { inner: Arc::new(Mutex::new(Inner { - counts: Counts::new(&config), - actions: Actions { - recv: Recv::new(&config), - send: Send::new(&config), - task: None, - }, - store: Store::new(), - })), + counts: Counts::new(&config), + actions: Actions { + recv: Recv::new(&config), + send: Send::new(&config), + task: None, + }, + store: Store::new(), + })), } } @@ -85,27 +85,29 @@ where let key = match me.store.find_entry(id) { Entry::Occupied(e) => e.key(), - Entry::Vacant(e) => { - match me.actions.recv.open(id, &mut me.counts)? { - Some(stream_id) => { - let stream = Stream::new(stream_id, - me.actions.send.init_window_sz(), - me.actions.recv.init_window_sz()); + Entry::Vacant(e) => match me.actions.recv.open(id, &mut me.counts)? { + Some(stream_id) => { + let stream = Stream::new( + stream_id, + me.actions.send.init_window_sz(), + me.actions.recv.init_window_sz(), + ); - e.insert(stream) - } - None => return Ok(()), - } - } + e.insert(stream) + }, + None => return Ok(()), + }, }; let stream = me.store.resolve(key); let actions = &mut me.actions; me.counts.transition(stream, |counts, stream| { - trace!("recv_headers; stream={:?}; state={:?}", - stream.id, - stream.state); + trace!( + "recv_headers; stream={:?}; state={:?}", + stream.id, + stream.state + ); let res = if stream.state.is_recv_headers() { actions.recv.recv_headers(frame, stream, counts) @@ -160,7 +162,7 @@ where .map_err(RecvError::Connection)?; return Ok(()); - } + }, }; let actions = &mut me.actions; @@ -211,11 +213,11 @@ where // This result is ignored as there is nothing to do when there // is an error. The stream is reset by the function on error and // the error is informational. - let _ = me.actions - .send - .recv_stream_window_update(frame.size_increment(), - &mut stream, - &mut me.actions.task); + let _ = me.actions.send.recv_stream_window_update( + frame.size_increment(), + &mut stream, + &mut me.actions.task, + ); } else { me.actions .recv @@ -255,7 +257,7 @@ where // Return the key Some(key) - } + }, None => None, } }; @@ -294,9 +296,11 @@ where try_ready!(me.actions.recv.poll_complete(&mut me.store, dst)); // Send any other pending frames - try_ready!(me.actions - .send - .poll_complete(&mut me.store, &mut me.counts, dst)); + try_ready!(me.actions.send.poll_complete( + &mut me.store, + &mut me.counts, + dst + )); // Nothing else to do, track the task me.actions.task = Some(task::current()); @@ -335,9 +339,11 @@ where // Initialize a new stream. This fails if the connection is at capacity. let stream_id = me.actions.send.open(&mut me.counts)?; - let mut stream = Stream::new(stream_id, - me.actions.send.init_window_sz(), - me.actions.recv.init_window_sz()); + let mut stream = Stream::new( + stream_id, + me.actions.send.init_window_sz(), + me.actions.recv.init_window_sz(), + ); if *request.method() == Method::HEAD { stream.content_length = ContentLength::Head; @@ -363,9 +369,9 @@ where }; Ok(StreamRef { - inner: self.inner.clone(), - key: key, - }) + inner: self.inner.clone(), + key: key, + }) } pub fn send_reset(&mut self, id: StreamId, reason: Reason) { @@ -374,16 +380,14 @@ where let key = match me.store.find_entry(id) { Entry::Occupied(e) => e.key(), - Entry::Vacant(e) => { - match me.actions.recv.open(id, &mut me.counts) { - Ok(Some(stream_id)) => { - let stream = Stream::new(stream_id, 0, 0); + Entry::Vacant(e) => match me.actions.recv.open(id, &mut me.counts) { + Ok(Some(stream_id)) => { + let stream = Stream::new(stream_id, 0, 0); - e.insert(stream) - } - _ => return, - } - } + e.insert(stream) + }, + _ => return, + }, }; let stream = me.store.resolve(key); @@ -651,8 +655,8 @@ where res: Result<(), RecvError>, ) -> Result<(), RecvError> { if let Err(RecvError::Stream { - reason, .. - }) = res + reason, .. + }) = res { // Reset the stream. self.send.send_reset(reason, stream, &mut self.task); diff --git a/src/server.rs b/src/server.rs index 293680e..f53c9f9 100644 --- a/src/server.rs +++ b/src/server.rs @@ -127,8 +127,8 @@ where // If the socket is closed, don't return anything // TODO: drop any pending streams return Ok(None.into()); - } - _ => {} + }, + _ => {}, } if let Some(inner) = self.connection.next_incoming() { @@ -296,7 +296,7 @@ where } dst.send_data(chunk, false)?; - } + }, None => { // TODO: It would be nice to not have to send an extra // frame... @@ -305,7 +305,7 @@ where } return Ok(Async::Ready(self.dst.take().unwrap())); - } + }, } } } @@ -416,12 +416,14 @@ impl proto::Peer for Peer { use http::response::Parts; // Extract the components of the HTTP request - let (Parts { - status, - headers, - .. - }, - _) = response.into_parts(); + let ( + Parts { + status, + headers, + .. + }, + _, + ) = response.into_parts(); // Build the set pseudo header set. All requests will include `method` // and `path`. @@ -500,10 +502,10 @@ impl proto::Peer for Peer { // TODO: Should there be more specialized handling for different // kinds of errors return Err(RecvError::Stream { - id: stream_id, - reason: ProtocolError, - }); - } + id: stream_id, + reason: ProtocolError, + }); + }, }; *request.headers_mut() = fields; diff --git a/tests/codec_read.rs b/tests/codec_read.rs index 98e6034..d9f6b05 100644 --- a/tests/codec_read.rs +++ b/tests/codec_read.rs @@ -124,6 +124,8 @@ fn update_max_frame_len_at_rest() { codec.set_max_recv_frame_size(2); assert_eq!(codec.max_recv_frame_size(), 2); - assert_eq!(codec.poll().unwrap_err().description(), - "frame size too big"); + assert_eq!( + codec.poll().unwrap_err().description(), + "frame size too big" + ); } diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 5e27ca8..41c3ef0 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -241,8 +241,10 @@ fn recv_data_overflows_connection_window() { // client should see a flow control error let conn = h2.then(|res| { let err = res.unwrap_err(); - assert_eq!(err.to_string(), - "protocol error: flow-control protocol violated"); + assert_eq!( + err.to_string(), + "protocol error: flow-control protocol violated" + ); Ok::<(), ()>(()) }); conn.unwrap().join(req) @@ -498,8 +500,8 @@ fn recv_window_update_on_stream_closed_by_data_frame() { // Wait for the response h2.drive(GetResponse { - stream: Some(stream), - }) + stream: Some(stream), + }) }) .and_then(|(h2, (response, mut stream))| { assert_eq!(response.status(), StatusCode::OK); @@ -514,7 +516,9 @@ fn recv_window_update_on_stream_closed_by_data_frame() { let srv = srv.assert_client_handshake() .unwrap() .recv_settings() - .recv_frame(frames::headers(1).request("POST", "https://http2.akamai.com/")) + .recv_frame( + frames::headers(1).request("POST", "https://http2.akamai.com/"), + ) .send_frame(frames::headers(1).response(200)) .recv_frame(frames::data(1, "hello").eos()) .send_frame(frames::window_update(1, 5)) @@ -553,8 +557,8 @@ fn reserved_capacity_assigned_in_multi_window_updates() { stream.send_data("world".into(), true).unwrap(); h2.drive(GetResponse { - stream: Some(stream), - }) + stream: Some(stream), + }) }) .and_then(|(h2, (response, _))| { assert_eq!(response.status(), StatusCode::NO_CONTENT); diff --git a/tests/prioritization.rs b/tests/prioritization.rs index edba3b4..fb9095d 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -193,7 +193,10 @@ fn send_data_receive_window_update() { stream.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize); // Wait for capacity - h2.drive(util::wait_for_capacity(stream, frame::DEFAULT_INITIAL_WINDOW_SIZE as usize)) + h2.drive(util::wait_for_capacity( + stream, + frame::DEFAULT_INITIAL_WINDOW_SIZE as usize, + )) }) .and_then(|(h2, mut stream)| { let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize]; diff --git a/tests/stream_states.rs b/tests/stream_states.rs index 1c120ff..cf0be67 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -184,9 +184,11 @@ fn closed_streams_are_released() { let srv = srv.assert_client_handshake() .unwrap() .recv_settings() - .recv_frame(frames::headers(1) - .request("GET", "https://example.com/") - .eos()) + .recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) .send_frame(frames::headers(1).response(204).eos()) .close();