From b0e6867877476563b7c285b8d12b4d99b199d50b Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 24 Aug 2017 15:52:01 -0700 Subject: [PATCH] Fix warnings --- src/client.rs | 13 ++++++------ src/frame/headers.rs | 9 ++------ src/frame/mod.rs | 16 +++------------ src/frame/priority.rs | 8 -------- src/frame/settings.rs | 7 +------ src/frame/window_update.rs | 3 +-- src/hpack/decoder.rs | 1 + src/hpack/encoder.rs | 1 + src/lib.rs | 34 +------------------------------ src/proto/codec.rs | 1 - src/proto/connection.rs | 33 +++++------------------------- src/proto/framed_read.rs | 4 ++-- src/proto/framed_write.rs | 13 ++++++------ src/proto/mod.rs | 29 +++----------------------- src/proto/ping_pong.rs | 2 ++ src/proto/settings.rs | 4 +++- src/proto/streams/buffer.rs | 2 +- src/proto/streams/flow_control.rs | 19 ++--------------- src/proto/streams/mod.rs | 3 ++- src/proto/streams/prioritize.rs | 15 +++++--------- src/proto/streams/recv.rs | 14 ++++++------- src/proto/streams/send.rs | 25 +++++++++-------------- src/proto/streams/state.rs | 2 -- src/proto/streams/store.rs | 32 ++++++++--------------------- src/proto/streams/streams.rs | 24 +++++++++++----------- src/server.rs | 15 ++++++-------- 26 files changed, 91 insertions(+), 238 deletions(-) diff --git a/src/client.rs b/src/client.rs index e2049c2..a9eafc0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,10 +1,11 @@ -use {frame, ConnectionError, StreamId}; +use {frame, ConnectionError}; use Body; +use frame::StreamId; use proto::{self, Connection, WindowSize}; use error::Reason::*; -use http::{self, Request, Response}; -use futures::{self, Future, Poll, Sink, Async, AsyncSink}; +use http::{Request, Response}; +use futures::{Future, Poll, Sink, Async, AsyncSink}; use tokio_io::{AsyncRead, AsyncWrite}; use bytes::{Bytes, IntoBuf}; @@ -148,9 +149,7 @@ impl Stream { } /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: usize) - -> Result<(), ConnectionError> - { + pub fn reserve_capacity(&mut self, capacity: usize) { // TODO: Check for overflow self.inner.reserve_capacity(capacity as WindowSize) } @@ -174,7 +173,7 @@ impl Stream { } /// Send trailers - pub fn send_trailers(&mut self, trailers: ()) + pub fn send_trailers(&mut self, _trailers: ()) -> Result<(), ConnectionError> { unimplemented!(); diff --git a/src/frame/headers.rs b/src/frame/headers.rs index fdf5a67..99edb6e 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -3,11 +3,10 @@ use hpack; use frame::{self, Frame, Head, Kind, Error}; use HeaderMap; -use http::{self, request, response, version, uri, Method, StatusCode, Uri}; -use http::{Request, Response}; +use http::{self, version, uri, Request, Response, Method, StatusCode, Uri}; use http::header::{self, HeaderName, HeaderValue}; -use bytes::{BytesMut, Bytes, Buf}; +use bytes::{BytesMut, Bytes}; use byteorder::{BigEndian, ByteOrder}; use string::String; @@ -291,10 +290,6 @@ impl Headers { Ok(request) } - pub fn into_fields(self) -> HeaderMap { - self.fields - } - pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut) -> Option { diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 9c4999a..21f723e 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -1,7 +1,7 @@ use hpack; use error::{ConnectionError, Reason}; -use bytes::{Bytes, Buf}; +use bytes::Bytes; use std::fmt; @@ -56,6 +56,8 @@ pub use self::settings::{ DEFAULT_MAX_FRAME_SIZE, }; +pub type FrameSize = u32; + pub const HEADER_LEN: usize = 9; pub enum Frame { @@ -100,18 +102,6 @@ impl Frame { } } -impl Frame { - /// Returns the length of the frame as it applies to flow control. - pub fn flow_len(&self) -> usize { - use self::Frame::*; - - match *self { - Data(ref frame) => frame.payload().remaining(), - _ => 0, - } - } -} - impl fmt::Debug for Frame { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { use self::Frame::*; diff --git a/src/frame/priority.rs b/src/frame/priority.rs index 2947901..253a5ef 100644 --- a/src/frame/priority.rs +++ b/src/frame/priority.rs @@ -72,12 +72,4 @@ impl StreamDependency { pub fn dependency_id(&self) -> StreamId { self.dependency_id } - - pub fn weight(&self) -> u8 { - self.weight - } - - pub fn is_exclusive(&self) -> bool { - self.is_exclusive - } } diff --git a/src/frame/settings.rs b/src/frame/settings.rs index d42084c..6f9a7a5 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -1,5 +1,4 @@ -use FrameSize; -use frame::{Frame, Error, Head, Kind, StreamId}; +use frame::{Frame, FrameSize, Error, Head, Kind, StreamId}; use bytes::{BytesMut, BufMut, BigEndian}; #[derive(Debug, Clone, Default, Eq, PartialEq)] @@ -54,10 +53,6 @@ impl Settings { self.flags.is_ack() } - pub fn enable_push(&self) -> Option { - self.enable_push.map(|n| n != 0) - } - pub fn initial_window_size(&self) -> Option { self.initial_window_size } diff --git a/src/frame/window_update.rs b/src/frame/window_update.rs index fc4bd39..19dffb8 100644 --- a/src/frame/window_update.rs +++ b/src/frame/window_update.rs @@ -1,5 +1,4 @@ -use StreamId; -use frame::{self, Head, Kind, Error}; +use frame::{self, Head, Kind, Error, StreamId}; use bytes::{BufMut, BigEndian}; diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index 5784466..257a263 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -151,6 +151,7 @@ impl Decoder { } /// Queues a potential size update + #[allow(dead_code)] pub fn queue_size_update(&mut self, size: usize) { let size = match self.max_size_update { Some(v) => cmp::max(v, size), diff --git a/src/hpack/encoder.rs b/src/hpack/encoder.rs index aa7f61f..9c597fd 100644 --- a/src/hpack/encoder.rs +++ b/src/hpack/encoder.rs @@ -44,6 +44,7 @@ impl Encoder { /// Queues a max size update. /// /// The next call to `encode` will include a dynamic size update frame. + #[allow(dead_code)] pub fn update_max_size(&mut self, val: usize) { match self.size_update { Some(SizeUpdate::One(old)) => { diff --git a/src/lib.rs b/src/lib.rs index 36a5b42..1892e8d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,4 @@ -#![allow(warnings)] -// #![deny(missing_debug_implementations)] +#![deny(warnings, missing_debug_implementations)] #[macro_use] extern crate futures; @@ -35,11 +34,9 @@ mod frame; pub mod server; pub use error::{ConnectionError, Reason}; -pub use frame::StreamId; use bytes::Bytes; -pub type FrameSize = u32; // TODO: remove if carllerche/http#90 lands pub type HeaderMap = http::HeaderMap; @@ -74,32 +71,3 @@ impl futures::Stream for Body { self.inner.poll_data() } } - -// TODO: Delete below - -/// An H2 connection frame -#[derive(Debug)] -pub enum Frame { - Headers { - id: StreamId, - headers: T, - end_of_stream: bool, - }, - Data { - id: StreamId, - data: B, - end_of_stream: bool, - }, - Trailers { - id: StreamId, - headers: HeaderMap, - }, - PushPromise { - id: StreamId, - promised_id: StreamId, - }, - Reset { - id: StreamId, - error: Reason, - }, -} diff --git a/src/proto/codec.rs b/src/proto/codec.rs index ae20194..daf0afb 100644 --- a/src/proto/codec.rs +++ b/src/proto/codec.rs @@ -56,7 +56,6 @@ impl futures::Stream for Codec type Error = ConnectionError; fn poll(&mut self) -> Poll, ConnectionError> { - use futures::Stream; self.inner.poll() } } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 9bfd9fb..55616c0 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,10 +1,9 @@ -use {client, server, ConnectionError, Frame}; -use HeaderMap; -use frame::{self, StreamId}; +use {client, frame, server, ConnectionError}; use proto::*; -use http::{Request, Response}; +use http::Request; +use futures::{Sink, Stream}; use bytes::{Bytes, IntoBuf}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -163,10 +162,10 @@ impl Connection trace!("recv SETTINGS; frame={:?}", frame); self.settings.recv_settings(frame); } - Some(GoAway(frame)) => { + Some(GoAway(_)) => { // TODO: handle the last_processed_id. Also, should this be // handled as an error? - let e = ConnectionError::Proto(frame.reason()); + // let _ = ConnectionError::Proto(frame.reason()); return Ok(().into()); } Some(Ping(frame)) => { @@ -220,21 +219,6 @@ impl Connection } } } - - fn convert_poll_message(frame: frame::Headers) -> Result, ConnectionError> { - if frame.is_trailers() { - Ok(Frame::Trailers { - id: frame.stream_id(), - headers: frame.into_fields() - }) - } else { - Ok(Frame::Headers { - id: frame.stream_id(), - end_of_stream: frame.is_end_stream(), - headers: P::convert_poll_message(frame)?, - }) - } - } } impl Connection @@ -261,13 +245,6 @@ impl Connection // ====== impl State ===== impl State { - fn is_open(&self) -> bool { - match *self { - State::Open => true, - _ => false, - } - } - fn error(&self) -> Option { match *self { State::Error(reason) => Some(reason), diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index c5d1e40..32376e7 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -6,12 +6,12 @@ use error::Reason::*; use futures::*; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use tokio_io::AsyncRead; use tokio_io::codec::length_delimited; -use std::io::{self, Cursor}; +use std::io; #[derive(Debug)] pub struct FramedRead { diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index e33a0af..3a173f4 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -1,12 +1,11 @@ -use {hpack, ConnectionError, FrameSize}; +use {hpack, ConnectionError}; use error::User::*; -use frame::{self, Frame}; +use frame::{self, Frame, FrameSize}; use futures::*; use tokio_io::{AsyncRead, AsyncWrite}; use bytes::{BytesMut, Buf, BufMut}; -use std::cmp; use std::io::{self, Cursor}; #[derive(Debug)] @@ -176,9 +175,11 @@ impl Sink for FramedWrite trace!("encoded window_update; rem={:?}", self.buf.remaining()); } - Frame::Priority(v) => { - // v.encode(self.buf.get_mut()); + Frame::Priority(_) => { + /* + v.encode(self.buf.get_mut()); trace!("encoded priority; rem={:?}", self.buf.remaining()); + */ unimplemented!(); } Frame::Reset(v) => { @@ -210,7 +211,7 @@ impl Sink for FramedWrite Some(Next::Data(frame)) => { self.last_data_frame = Some(frame); } - Some(Next::Continuation(frame)) => { + Some(Next::Continuation(_)) => { unimplemented!(); } None => {} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index c8429d7..dff1321 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -16,11 +16,11 @@ use self::ping_pong::PingPong; use self::settings::Settings; use self::streams::Prioritized; -use {StreamId, ConnectionError}; +use ConnectionError; use error::Reason; -use frame::{self, Frame}; +use frame::{self, Frame, StreamId}; -use futures::{self, task, Poll, Async, AsyncSink, Sink, Stream as Stream2}; +use futures::{self, task, Poll, Async, AsyncSink}; use futures::task::Task; use bytes::{Buf, IntoBuf}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -50,12 +50,6 @@ pub type PingPayload = [u8; 8]; pub type WindowSize = u32; -#[derive(Debug, Copy, Clone)] -pub struct WindowUpdate { - stream_id: StreamId, - increment: WindowSize, -} - // Constants pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; @@ -94,20 +88,3 @@ pub(crate) fn from_framed_write(framed_write: FramedWrite WindowUpdate { - WindowUpdate { - stream_id, - increment - } - } - - pub fn stream_id(&self) -> StreamId { - self.stream_id - } - - pub fn increment(&self) -> WindowSize { - self.increment - } -} diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index d50d9ca..88f296a 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -2,6 +2,8 @@ use ConnectionError; use frame::Ping; use proto::*; +use futures::Sink; + /// Acknowledges ping requests from the remote. #[derive(Debug)] pub struct PingPong { diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 69683fd..0beb685 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,6 +1,8 @@ use {frame, ConnectionError}; use proto::*; +use futures::Sink; + #[derive(Debug)] pub(crate) struct Settings { /// Received SETTINGS frame pending processing. The ACK must be written to @@ -45,7 +47,7 @@ impl Settings { trace!("ACK sent"); dst.apply_remote_settings(settings); - streams.apply_remote_settings(settings); + streams.apply_remote_settings(settings)?; } self.pending = None; diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 0134e7c..2e81501 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -1,4 +1,4 @@ -use frame::{self, Frame}; +use frame::Frame; use slab::Slab; diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index 19eb2a1..521be38 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -2,8 +2,6 @@ use ConnectionError; use proto::*; use error::Reason::*; -use std::cmp; - #[derive(Copy, Clone, Debug)] pub struct FlowControl { /// Window size as indicated by the peer. This can go negative. @@ -49,21 +47,8 @@ impl FlowControl { self.available -= capacity; } - pub fn assign_capacity(&mut self, capacity: WindowSize) - -> Result<(), ConnectionError> - { - let (val, overflow) = self.available.overflowing_add(capacity); - - if overflow { - return Err(FlowControlError.into()); - } - - if val > MAX_WINDOW_SIZE { - return Err(FlowControlError.into()); - } - - self.available = val; - Ok(()) + pub fn assign_capacity(&mut self, capacity: WindowSize) { + self.available + capacity; } /// Returns the number of bytes available but not assigned to the window. diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 2f52cd6..9e09002 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -20,7 +20,8 @@ use self::state::State; use self::store::{Store, Entry}; use self::stream::Stream; -use {frame, StreamId, ConnectionError}; +use {frame, ConnectionError}; +use frame::StreamId; use proto::*; use error::Reason::*; use error::User::*; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 0377cec..c138d4c 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -1,6 +1,7 @@ use super::*; use bytes::buf::Take; +use futures::Sink; use std::{fmt, cmp}; @@ -40,8 +41,7 @@ impl Prioritize flow.inc_window(config.init_local_window_sz) .ok().expect("invalid initial window size"); - flow.assign_capacity(config.init_local_window_sz) - .ok().expect("invalid initial window size"); + flow.assign_capacity(config.init_local_window_sz); Prioritize { pending_send: store::Queue::new(), @@ -131,16 +131,13 @@ impl Prioritize } /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) - -> Result<(), ConnectionError> - { + pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { // Actual capacity is `capacity` + the current amount of buffered data. // It it were less, then we could never send out the buffered data. let capacity = capacity + stream.buffered_send_data; if capacity == stream.requested_send_capacity { // Nothing to do - return Ok(()); } else if capacity < stream.requested_send_capacity { // TODO: release capacity unimplemented!(); @@ -152,8 +149,6 @@ impl Prioritize // currently available, the stream will be queued to receive some // when more becomes available. self.try_assign_capacity(stream); - - Ok(()) } } @@ -184,7 +179,7 @@ impl Prioritize { // Update the connection's window self.flow.inc_window(inc)?; - self.flow.assign_capacity(inc)?; + self.flow.assign_capacity(inc); // Assign newly acquired capacity to streams pending capacity. while self.flow.available() > 0 { @@ -212,7 +207,7 @@ impl Prioritize // The amount of additional capacity that the stream requests. // Don't assign more than the window has available! - let mut additional = cmp::min( + let additional = cmp::min( total_requested - stream.send_flow.available(), stream.send_flow.window_size()); diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 1961790..79e98e1 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -3,8 +3,8 @@ use proto::*; use super::*; use error::Reason::*; +use futures::Sink; -use std::collections::VecDeque; use std::marker::PhantomData; #[derive(Debug)] @@ -58,7 +58,8 @@ impl Recv where B: Buf { let mut flow = FlowControl::new(); - flow.inc_window(config.init_remote_window_sz); + flow.inc_window(config.init_remote_window_sz) + .ok().expect("invalid initial remote window size"); flow.assign_capacity(config.init_remote_window_sz); Recv { @@ -127,7 +128,7 @@ impl Recv where B: Buf { // frame vs. now. Ok(client::Peer::convert_poll_message(v)?.into()) } - Some(frame) => unimplemented!(), + Some(_) => unimplemented!(), None => { stream.state.ensure_recv_open()?; @@ -187,7 +188,7 @@ impl Recv where B: Buf { -> Result<(), ConnectionError> { // Transition the state - stream.state.recv_close(); + stream.state.recv_close()?; // Push the frame onto the stream's recv buffer stream.pending_recv.push_back(&mut self.buffer, frame.into()); @@ -199,7 +200,6 @@ impl Recv where B: Buf { pub fn release_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr, - send: &mut Send, task: &mut Option) -> Result<(), ConnectionError> { @@ -315,7 +315,7 @@ impl Recv where B: Buf { send.init_window_sz(), self.init_window_sz); - new_stream.state.reserve_remote(); + new_stream.state.reserve_remote()?; let mut ppp = store[stream].pending_push_promises.take(); @@ -471,7 +471,7 @@ impl Recv where B: Buf { let frame = frame::WindowUpdate::new(StreamId::zero(), incr); if dst.start_send(frame.into())?.is_ready() { - self.flow.inc_window(incr); + self.flow.inc_window(incr).ok().expect("unexpected flow control state"); } else { return Ok(Async::NotReady); } diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index aae8101..fdab877 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -7,9 +7,6 @@ use error::User::*; use bytes::Buf; -use std::collections::VecDeque; -use std::marker::PhantomData; - /// Manages state transitions related to outbound frames. #[derive(Debug)] pub(super) struct Send { @@ -104,12 +101,6 @@ impl Send where B: Buf { Ok(()) } - pub fn send_eos(&mut self, stream: &mut Stream) - -> Result<(), ConnectionError> - { - stream.state.send_close() - } - pub fn send_reset(&mut self, reason: Reason, stream: &mut store::Ptr, task: &mut Option) @@ -124,6 +115,7 @@ impl Send where B: Buf { let frame = frame::Reset::new(stream.id, reason); + // TODO: This could impact connection level flow control. self.prioritize.clear_queue(stream); trace!("send_reset -- queueing; frame={:?}", frame); @@ -151,9 +143,7 @@ impl Send where B: Buf { } /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) - -> Result<(), ConnectionError> - { + pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { self.prioritize.reserve_capacity(capacity, stream) } @@ -211,6 +201,7 @@ impl Send where B: Buf { settings: &frame::Settings, store: &mut Store, task: &mut Option) + -> Result<(), ConnectionError> { if let Some(val) = settings.max_concurrent_streams() { self.max_streams = Some(val as usize); @@ -251,15 +242,19 @@ impl Send where B: Buf { // TODO: Should this notify the producer? } - }); + + Ok(()) + })?; } else if val > old_val { let inc = val - old_val; store.for_each(|mut stream| { - self.recv_stream_window_update(inc, &mut stream, task); - }); + self.recv_stream_window_update(inc, &mut stream, task) + })?; } } + + Ok(()) } pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), ConnectionError> { diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 10944f7..3635fea 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -2,8 +2,6 @@ use ConnectionError; use error::Reason; use error::Reason::*; use error::User::*; -use proto::*; -use super::FlowControl; use self::Inner::*; use self::Peer::*; diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 6388f77..82e7f86 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -49,13 +49,12 @@ struct Indices { } pub(super) enum Entry<'a, B: 'a> { - Occupied(OccupiedEntry<'a, B>), + Occupied(OccupiedEntry<'a>), Vacant(VacantEntry<'a, B>), } -pub(super) struct OccupiedEntry<'a, B: 'a> { +pub(super) struct OccupiedEntry<'a> { ids: hash_map::OccupiedEntry<'a, StreamId, usize>, - slab: &'a mut slab::Slab>, } pub(super) struct VacantEntry<'a, B: 'a> { @@ -108,7 +107,6 @@ impl Store { Occupied(e) => { Entry::Occupied(OccupiedEntry { ids: e, - slab: &mut self.slab, }) } Vacant(e) => { @@ -120,15 +118,17 @@ impl Store { } } - pub fn for_each(&mut self, mut f: F) - where F: FnMut(Ptr) + pub fn for_each(&mut self, mut f: F) -> Result<(), ConnectionError> + where F: FnMut(Ptr) -> Result<(), ConnectionError>, { for &key in self.ids.values() { f(Ptr { key: Key(key), slab: &mut self.slab, - }); + })?; } + + Ok(()) } } @@ -245,10 +245,6 @@ impl<'a, B: 'a> Ptr<'a, B> { slab: &mut *self.slab, } } - - pub fn into_mut(self) -> &'a mut Stream { - &mut self.slab[self.key.0] - } } impl<'a, B: 'a> ops::Deref for Ptr<'a, B> { @@ -267,22 +263,10 @@ impl<'a, B: 'a> ops::DerefMut for Ptr<'a, B> { // ===== impl OccupiedEntry ===== -impl<'a, B> OccupiedEntry<'a, B> { +impl<'a> OccupiedEntry<'a> { pub fn key(&self) -> Key { Key(*self.ids.get()) } - - pub fn get(&self) -> &Stream { - &self.slab[*self.ids.get()] - } - - pub fn get_mut(&mut self) -> &mut Stream { - &mut self.slab[*self.ids.get()] - } - - pub fn into_mut(self) -> &'a mut Stream { - &mut self.slab[*self.ids.get()] - } } // ===== impl VacantEntry ===== diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 2606899..c51f5e9 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -2,7 +2,6 @@ use {client, server}; use proto::*; use super::*; -use std::marker::PhantomData; use std::sync::{Arc, Mutex}; #[derive(Debug)] @@ -134,7 +133,7 @@ impl Streams return Err(ProtocolError.into()); } - let mut stream = match me.store.find_mut(&id) { + let stream = match me.store.find_mut(&id) { Some(stream) => stream, None => { // TODO: Are there other error cases? @@ -159,8 +158,9 @@ impl Streams let last_processed_id = actions.recv.last_processed_id(); me.store.for_each(|mut stream| { - actions.recv.recv_err(err, &mut *stream) - }); + actions.recv.recv_err(err, &mut *stream); + Ok(()) + }).ok().expect("unexpected error processing error"); last_processed_id } @@ -180,7 +180,7 @@ impl Streams // considers closed. It's ok... if let Some(mut stream) = me.store.find_mut(&id) { me.actions.send.recv_stream_window_update( - frame.size_increment(), &mut stream, &mut me.actions.task); + frame.size_increment(), &mut stream, &mut me.actions.task)?; } else { me.actions.recv.ensure_not_idle(id)?; } @@ -197,7 +197,7 @@ impl Streams let id = frame.stream_id(); - let mut stream = match me.store.find_mut(&id) { + let stream = match me.store.find_mut(&id) { Some(stream) => stream.key(), None => return Err(ProtocolError.into()), }; @@ -253,12 +253,14 @@ impl Streams Ok(().into()) } - pub fn apply_remote_settings(&mut self, frame: &frame::Settings) { + pub fn apply_remote_settings(&mut self, frame: &frame::Settings) + -> Result<(), ConnectionError> + { let mut me = self.inner.lock().unwrap(); let me = &mut *me; me.actions.send.apply_remote_settings( - frame, &mut me.store, &mut me.actions.task); + frame, &mut me.store, &mut me.actions.task) } pub fn poll_send_request_ready(&mut self) -> Poll<(), ConnectionError> { @@ -412,13 +414,11 @@ impl StreamRef let mut stream = me.store.resolve(self.key); me.actions.recv.release_capacity( - capacity, &mut stream, &mut me.actions.send, &mut me.actions.task) + capacity, &mut stream, &mut me.actions.task) } /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: WindowSize) - -> Result<(), ConnectionError> - { + pub fn reserve_capacity(&mut self, capacity: WindowSize) { let mut me = self.inner.lock().unwrap(); let me = &mut *me; diff --git a/src/server.rs b/src/server.rs index b386559..fa4fe47 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,12 +1,13 @@ -use {frame, Body, ConnectionError, StreamId}; +use {Body, ConnectionError}; +use frame::{self, StreamId}; use proto::{self, Connection, WindowSize}; use error::Reason; use error::Reason::*; -use http::{self, Request, Response}; +use http::{Request, Response}; use futures::{self, Future, Sink, Poll, Async, AsyncSink, IntoFuture}; use tokio_io::{AsyncRead, AsyncWrite}; -use bytes::{Bytes, IntoBuf, Buf}; +use bytes::{Bytes, IntoBuf}; use std::fmt; @@ -160,9 +161,7 @@ impl Stream { } /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: usize) - -> Result<(), ConnectionError> - { + pub fn reserve_capacity(&mut self, capacity: usize) { // TODO: Check for overflow self.inner.reserve_capacity(capacity as WindowSize) } @@ -186,7 +185,7 @@ impl Stream { } /// Send trailers - pub fn send_trailers(&mut self, trailers: ()) + pub fn send_trailers(&mut self, _trailers: ()) -> Result<(), ConnectionError> { unimplemented!(); @@ -220,8 +219,6 @@ impl Future for Send type Error = ConnectionError; fn poll(&mut self) -> Poll { - use futures::Stream; - loop { if self.buf.is_none() { // Get a chunk to send to the H2 stream