rustfmt: add trailing commas in match arms, set fn call to block stle (#85)

This commit is contained in:
Sean McArthur
2017-09-12 19:29:06 -07:00
committed by Carl Lerche
parent de1edf4873
commit f7d14861e5
37 changed files with 894 additions and 973 deletions

View File

@@ -61,11 +61,11 @@ where
pub fn new(codec: Codec<T, Prioritized<B::Buf>>) -> Connection<T, P, B> {
// TODO: Actually configure
let streams = Streams::new(streams::Config {
max_remote_initiated: None,
init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
max_local_initiated: None,
init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
});
max_remote_initiated: None,
init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
max_local_initiated: None,
init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
});
Connection {
state: State::Open,
@@ -85,8 +85,10 @@ where
// The order of these calls don't really matter too much as only one
// should have pending work.
try_ready!(self.ping_pong.send_pending_pong(&mut self.codec));
try_ready!(self.settings
.send_pending_ack(&mut self.codec, &mut self.streams));
try_ready!(
self.settings
.send_pending_ack(&mut self.codec, &mut self.streams)
);
try_ready!(self.streams.send_pending_refusal(&mut self.codec));
Ok(().into())
@@ -112,7 +114,7 @@ where
try_ready!(self.streams.poll_complete(&mut self.codec));
return Ok(Async::NotReady);
}
},
// Attempting to read a frame resulted in a connection level
// error. This is handled by setting a GO_AWAY frame followed by
// terminating the connection.
@@ -127,17 +129,17 @@ where
// Transition to the going away state.
self.state = State::GoAway(frame);
}
},
// Attempting to read a frame resulted in a stream level error.
// This is handled by resetting the frame then trying to read
// another frame.
Err(Stream {
id,
reason,
}) => {
id,
reason,
}) => {
trace!("stream level error; id={:?}; reason={:?}", id, reason);
self.streams.send_reset(id, reason);
}
},
// Attempting to read a frame resulted in an I/O error. All
// active streams must be reset.
//
@@ -150,9 +152,9 @@ where
// Return the error
return Err(e);
}
},
}
}
},
State::GoAway(frame) => {
// Ensure the codec is ready to accept the frame
try_ready!(self.codec.poll_ready());
@@ -165,17 +167,17 @@ where
// GO_AWAY sent, transition the connection to an errored state
self.state = State::Flush(frame.reason());
}
},
State::Flush(reason) => {
// Flush the codec
try_ready!(self.codec.flush());
// Transition the state to error
self.state = State::Error(reason);
}
},
State::Error(reason) => {
return Err(reason.into());
}
},
}
}
}
@@ -191,46 +193,46 @@ where
Some(Headers(frame)) => {
trace!("recv HEADERS; frame={:?}", frame);
self.streams.recv_headers(frame)?;
}
},
Some(Data(frame)) => {
trace!("recv DATA; frame={:?}", frame);
self.streams.recv_data(frame)?;
}
},
Some(Reset(frame)) => {
trace!("recv RST_STREAM; frame={:?}", frame);
self.streams.recv_reset(frame)?;
}
},
Some(PushPromise(frame)) => {
trace!("recv PUSH_PROMISE; frame={:?}", frame);
self.streams.recv_push_promise(frame)?;
}
},
Some(Settings(frame)) => {
trace!("recv SETTINGS; frame={:?}", frame);
self.settings.recv_settings(frame);
}
},
Some(GoAway(_)) => {
// TODO: handle the last_processed_id. Also, should this be
// handled as an error?
// let _ = RecvError::Proto(frame.reason());
return Ok(().into());
}
},
Some(Ping(frame)) => {
trace!("recv PING; frame={:?}", frame);
self.ping_pong.recv_ping(frame);
}
},
Some(WindowUpdate(frame)) => {
trace!("recv WINDOW_UPDATE; frame={:?}", frame);
self.streams.recv_window_update(frame)?;
}
},
Some(Priority(frame)) => {
trace!("recv PRIORITY; frame={:?}", frame);
// TODO: handle
}
},
None => {
// TODO: Is this correct?
trace!("codec closed");
return Ok(Async::Ready(()));
}
},
}
}
}

View File

@@ -50,41 +50,41 @@ impl<T> Deque<T> {
pub fn push_back(&mut self, buf: &mut Buffer<T>, value: T) {
let key = buf.slab.insert(Slot {
value,
next: None,
});
value,
next: None,
});
match self.indices {
Some(ref mut idxs) => {
buf.slab[idxs.tail].next = Some(key);
idxs.tail = key;
}
},
None => {
self.indices = Some(Indices {
head: key,
tail: key,
});
}
head: key,
tail: key,
});
},
}
}
pub fn push_front(&mut self, buf: &mut Buffer<T>, value: T) {
let key = buf.slab.insert(Slot {
value,
next: None,
});
value,
next: None,
});
match self.indices {
Some(ref mut idxs) => {
buf.slab[key].next = Some(idxs.head);
idxs.head = key;
}
},
None => {
self.indices = Some(Indices {
head: key,
tail: key,
});
}
head: key,
tail: key,
});
},
}
}
@@ -102,7 +102,7 @@ impl<T> Deque<T> {
}
return Some(slot.value);
}
},
None => None,
}
}

View File

@@ -110,10 +110,12 @@ impl FlowControl {
return Err(Reason::FlowControlError);
}
trace!("inc_window; sz={}; old={}; new={}",
sz,
self.window_size,
val);
trace!(
"inc_window; sz={}; old={}; new={}",
sz,
self.window_size,
val
);
self.window_size = val;
Ok(())
@@ -131,10 +133,12 @@ impl FlowControl {
/// Decrements the window reflecting data has actually been sent. The caller
/// must ensure that the window has capacity.
pub fn send_data(&mut self, sz: WindowSize) {
trace!("send_data; sz={}; window={}; available={}",
sz,
self.window_size,
self.available);
trace!(
"send_data; sz={}; window={}; available={}",
sz,
self.window_size,
self.available
);
// Ensure that the argument is correct
assert!(sz <= self.window_size as WindowSize);

View File

@@ -111,10 +111,12 @@ where
// Update the buffered data counter
stream.buffered_send_data += sz;
trace!("send_data; sz={}; buffered={}; requested={}",
sz,
stream.buffered_send_data,
stream.requested_send_capacity);
trace!(
"send_data; sz={}; buffered={}; requested={}",
sz,
stream.buffered_send_data,
stream.requested_send_capacity
);
// Implicitly request more send capacity if not enough has been
// requested yet.
@@ -130,9 +132,11 @@ where
self.reserve_capacity(0, stream);
}
trace!("send_data (2); available={}; buffered={}",
stream.send_flow.available(),
stream.buffered_send_data);
trace!(
"send_data (2); available={}; buffered={}",
stream.send_flow.available(),
stream.buffered_send_data
);
if stream.send_flow.available() >= stream.buffered_send_data {
// The stream currently has capacity to send the data frame, so
@@ -152,11 +156,13 @@ where
/// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr<B, P>) {
trace!("reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
stream.id,
capacity,
capacity + stream.buffered_send_data,
stream.requested_send_capacity);
trace!(
"reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
stream.id,
capacity,
capacity + stream.buffered_send_data,
stream.requested_send_capacity
);
// Actual capacity is `capacity` + the current amount of buffered data.
// It it were less, then we could never send out the buffered data.
@@ -196,11 +202,13 @@ where
inc: WindowSize,
stream: &mut store::Ptr<B, P>,
) -> Result<(), Reason> {
trace!("recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",
stream.id,
stream.state,
inc,
stream.send_flow);
trace!(
"recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",
stream.id,
stream.state,
inc,
stream.send_flow
);
// Update the stream level flow control.
stream.send_flow.inc_window(inc)?;
@@ -254,16 +262,20 @@ where
// The amount of additional capacity that the stream requests.
// Don't assign more than the window has available!
let additional = cmp::min(total_requested - stream.send_flow.available(),
// Can't assign more than what is available
stream.send_flow.window_size() - stream.send_flow.available());
let additional = cmp::min(
total_requested - stream.send_flow.available(),
// Can't assign more than what is available
stream.send_flow.window_size() - stream.send_flow.available(),
);
trace!("try_assign_capacity; requested={}; additional={}; buffered={}; window={}; conn={}",
total_requested,
additional,
stream.buffered_send_data,
stream.send_flow.window_size(),
self.flow.available());
trace!(
"try_assign_capacity; requested={}; additional={}; buffered={}; window={}; conn={}",
total_requested,
additional,
stream.buffered_send_data,
stream.send_flow.window_size(),
self.flow.available()
);
if additional == 0 {
// Nothing more to do
@@ -273,9 +285,11 @@ where
// If the stream has requested capacity, then it must be in the
// streaming state (more data could be sent) or there is buffered data
// waiting to be sent.
debug_assert!(stream.state.is_send_streaming() || stream.buffered_send_data > 0,
"state={:?}",
stream.state);
debug_assert!(
stream.state.is_send_streaming() || stream.buffered_send_data > 0,
"state={:?}",
stream.state
);
// The amount of currently available capacity on the connection
let conn_available = self.flow.available();
@@ -296,12 +310,13 @@ where
self.flow.claim_capacity(assign);
}
trace!("try_assign_capacity; available={}; requested={}; buffered={}; \
has_unavailable={:?}",
stream.send_flow.available(),
stream.requested_send_capacity,
stream.buffered_send_data,
stream.send_flow.has_unavailable());
trace!(
"try_assign_capacity; available={}; requested={}; buffered={}; has_unavailable={:?}",
stream.send_flow.available(),
stream.requested_send_capacity,
stream.buffered_send_data,
stream.send_flow.has_unavailable()
);
if stream.send_flow.available() < stream.requested_send_capacity {
if stream.send_flow.has_unavailable() {
@@ -367,7 +382,7 @@ where
// Because, always try to reclaim...
self.reclaim_frame(store, dst);
}
},
None => {
// Try to flush the codec.
try_ready!(dst.flush());
@@ -379,7 +394,7 @@ where
// No need to poll ready as poll_complete() does this for
// us...
}
},
}
}
}
@@ -400,9 +415,11 @@ where
// First check if there are any data chunks to take back
if let Some(frame) = dst.take_last_data_frame() {
trace!(" -> reclaimed; frame={:?}; sz={}",
frame,
frame.payload().remaining());
trace!(
" -> reclaimed; frame={:?}; sz={}",
frame,
frame.payload().remaining()
);
let mut eos = false;
let key = frame.payload().stream;
@@ -474,20 +491,24 @@ where
let stream_capacity = stream.send_flow.available();
let sz = frame.payload().remaining();
trace!(" --> data frame; stream={:?}; sz={}; eos={:?}; \
window={}; available={}; requested={}",
frame.stream_id(),
sz,
frame.is_end_stream(),
stream_capacity,
stream.send_flow.available(),
stream.requested_send_capacity);
trace!(
" --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \
available={}; requested={}",
frame.stream_id(),
sz,
frame.is_end_stream(),
stream_capacity,
stream.send_flow.available(),
stream.requested_send_capacity
);
// Zero length data frames always have capacity to
// be sent.
if sz > 0 && stream_capacity == 0 {
trace!(" --> stream capacity is 0; requested={}",
stream.requested_send_capacity);
trace!(
" --> stream capacity is 0; requested={}",
stream.requested_send_capacity
);
// Ensure that the stream is waiting for
// connection level capacity
@@ -551,7 +572,7 @@ where
stream: stream.key(),
}
}))
}
},
frame => frame.map(|_| unreachable!()),
};
@@ -568,7 +589,7 @@ where
counts.transition_after(stream, is_counted);
return Some(frame);
}
},
None => return None,
}
}

View File

@@ -150,7 +150,7 @@ where
Ok(v) => v,
Err(_) => {
unimplemented!();
}
},
};
stream.content_length = ContentLength::Remaining(content_length);
@@ -185,9 +185,9 @@ where
if stream.ensure_content_length_zero().is_err() {
return Err(RecvError::Stream {
id: stream.id,
reason: ProtocolError,
});
id: stream.id,
reason: ProtocolError,
});
}
let trailers = frame.into_fields();
@@ -264,10 +264,12 @@ where
return Err(RecvError::Connection(ProtocolError));
}
trace!("recv_data; size={}; connection={}; stream={}",
sz,
self.flow.window_size(),
stream.recv_flow.window_size());
trace!(
"recv_data; size={}; connection={}; stream={}",
sz,
self.flow.window_size(),
stream.recv_flow.window_size()
);
// Ensure that there is enough capacity on the connection before acting
// on the stream.
@@ -286,17 +288,17 @@ where
if stream.dec_content_length(frame.payload().len()).is_err() {
return Err(RecvError::Stream {
id: stream.id,
reason: ProtocolError,
});
id: stream.id,
reason: ProtocolError,
});
}
if frame.is_end_stream() {
if stream.ensure_content_length_zero().is_err() {
return Err(RecvError::Stream {
id: stream.id,
reason: ProtocolError,
});
id: stream.id,
reason: ProtocolError,
});
}
if stream.state.recv_close().is_err() {
@@ -343,9 +345,11 @@ where
// TODO: All earlier stream IDs should be implicitly closed.
// Now, create a new entry for the stream
let mut new_stream = Stream::new(frame.promised_id(),
send.init_window_sz(),
self.init_window_sz);
let mut new_stream = Stream::new(
frame.promised_id(),
send.init_window_sz(),
self.init_window_sz,
);
new_stream.state.reserve_remote()?;
@@ -559,7 +563,7 @@ where
// No more data frames
Ok(None.into())
}
},
None => {
if stream.state.is_recv_closed() {
// No more data frames will be received
@@ -569,7 +573,7 @@ where
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
}
}
},
}
}
@@ -584,7 +588,7 @@ where
// the entire set of data frames have been consumed. What should
// we do?
unimplemented!();
}
},
None => {
if stream.state.is_recv_closed() {
// There will be no trailer frame
@@ -594,7 +598,7 @@ where
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
}
}
},
}
}
}
@@ -630,7 +634,7 @@ where
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
}
},
}
}
}

View File

@@ -70,9 +70,11 @@ where
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>,
) -> Result<(), UserError> {
trace!("send_headers; frame={:?}; init_window={:?}",
frame,
self.init_window_sz);
trace!(
"send_headers; frame={:?}; init_window={:?}",
frame,
self.init_window_sz
);
let end_stream = frame.is_end_stream();
@@ -261,10 +263,12 @@ where
let stream = &mut *stream;
stream.send_flow.dec_window(dec);
trace!("decremented stream window; id={:?}; decr={}; flow={:?}",
stream.id,
dec,
stream.send_flow);
trace!(
"decremented stream window; id={:?}; decr={}; flow={:?}",
stream.id,
dec,
stream.send_flow
);
// TODO: Probably try to assign capacity?

View File

@@ -83,40 +83,34 @@ impl State {
let local = Peer::Streaming;
self.inner = match self.inner {
Idle => {
if eos {
HalfClosedLocal(AwaitingHeaders)
} else {
Open {
local,
remote: AwaitingHeaders,
}
Idle => if eos {
HalfClosedLocal(AwaitingHeaders)
} else {
Open {
local,
remote: AwaitingHeaders,
}
}
},
Open {
local: AwaitingHeaders,
remote,
} => {
if eos {
HalfClosedLocal(remote)
} else {
Open {
local,
remote,
}
} => if eos {
HalfClosedLocal(remote)
} else {
Open {
local,
remote,
}
}
HalfClosedRemote(AwaitingHeaders) => {
if eos {
Closed(None)
} else {
HalfClosedRemote(local)
}
}
},
HalfClosedRemote(AwaitingHeaders) => if eos {
Closed(None)
} else {
HalfClosedRemote(local)
},
_ => {
// All other transitions result in a protocol error
return Err(UnexpectedFrameType);
}
},
};
return Ok(());
@@ -142,7 +136,7 @@ impl State {
remote,
}
}
}
},
ReservedRemote => {
initial = true;
@@ -154,31 +148,27 @@ impl State {
remote,
}
}
}
},
Open {
local,
remote: AwaitingHeaders,
} => {
if eos {
HalfClosedRemote(local)
} else {
Open {
local,
remote,
}
} => if eos {
HalfClosedRemote(local)
} else {
Open {
local,
remote,
}
}
HalfClosedLocal(AwaitingHeaders) => {
if eos {
Closed(None)
} else {
HalfClosedLocal(remote)
}
}
},
HalfClosedLocal(AwaitingHeaders) => if eos {
Closed(None)
} else {
HalfClosedLocal(remote)
},
_ => {
// All other transitions result in a protocol error
return Err(RecvError::Connection(ProtocolError));
}
},
};
return Ok(initial);
@@ -190,7 +180,7 @@ impl State {
Idle => {
self.inner = ReservedRemote;
Ok(())
}
},
_ => Err(RecvError::Connection(ProtocolError)),
}
}
@@ -205,12 +195,12 @@ impl State {
trace!("recv_close: Open => HalfClosedRemote({:?})", local);
self.inner = HalfClosedRemote(local);
Ok(())
}
},
HalfClosedLocal(..) => {
trace!("recv_close: HalfClosedLocal => Closed");
self.inner = Closed(None);
Ok(())
}
},
_ => Err(RecvError::Connection(ProtocolError)),
}
}
@@ -219,14 +209,14 @@ impl State {
use proto::Error::*;
match self.inner {
Closed(..) => {}
Closed(..) => {},
_ => {
trace!("recv_err; err={:?}", err);
self.inner = Closed(match *err {
Proto(reason) => Some(Cause::Proto(reason)),
Io(..) => Some(Cause::Io),
});
}
Proto(reason) => Some(Cause::Proto(reason)),
Io(..) => Some(Cause::Io),
});
},
}
}
@@ -239,11 +229,11 @@ impl State {
// The remote side will continue to receive data.
trace!("send_close: Open => HalfClosedLocal({:?})", remote);
self.inner = HalfClosedLocal(remote);
}
},
HalfClosedRemote(..) => {
trace!("send_close: HalfClosedRemote => Closed");
self.inner = Closed(None);
}
},
_ => panic!("transition send_close on unexpected state"),
}
}

View File

@@ -106,9 +106,9 @@ where
};
Some(Ptr {
key: Key(key),
store: self,
})
key: Key(key),
store: self,
})
}
pub fn insert(&mut self, id: StreamId, val: Stream<B, P>) -> Ptr<B, P> {
@@ -125,17 +125,13 @@ where
use self::ordermap::Entry::*;
match self.ids.entry(id) {
Occupied(e) => {
Entry::Occupied(OccupiedEntry {
ids: e,
})
}
Vacant(e) => {
Entry::Vacant(VacantEntry {
ids: e,
slab: &mut self.slab,
})
}
Occupied(e) => Entry::Occupied(OccupiedEntry {
ids: e,
}),
Vacant(e) => Entry::Vacant(VacantEntry {
ids: e,
slab: &mut self.slab,
}),
}
}
@@ -151,9 +147,9 @@ where
let key = *self.ids.get_index(i).unwrap().1;
f(Ptr {
key: Key(key),
store: self,
})?;
key: Key(key),
store: self,
})?;
// TODO: This logic probably could be better...
let new_len = self.ids.len();
@@ -268,14 +264,14 @@ where
// Update the tail pointer
idxs.tail = stream.key();
}
},
None => {
trace!(" -> first entry");
self.indices = Some(store::Indices {
head: stream.key(),
tail: stream.key(),
});
}
head: stream.key(),
tail: stream.key(),
});
},
}
true

View File

@@ -217,14 +217,12 @@ where
/// Returns `Err` when the decrement cannot be completed due to overflow.
pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
match self.content_length {
ContentLength::Remaining(ref mut rem) => {
match rem.checked_sub(len as u64) {
Some(val) => *rem = val,
None => return Err(()),
}
}
ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) {
Some(val) => *rem = val,
None => return Err(()),
},
ContentLength::Head => return Err(()),
_ => {}
_ => {},
}
Ok(())

View File

@@ -66,14 +66,14 @@ where
pub fn new(config: Config) -> Self {
Streams {
inner: Arc::new(Mutex::new(Inner {
counts: Counts::new(&config),
actions: Actions {
recv: Recv::new(&config),
send: Send::new(&config),
task: None,
},
store: Store::new(),
})),
counts: Counts::new(&config),
actions: Actions {
recv: Recv::new(&config),
send: Send::new(&config),
task: None,
},
store: Store::new(),
})),
}
}
@@ -85,27 +85,29 @@ where
let key = match me.store.find_entry(id) {
Entry::Occupied(e) => e.key(),
Entry::Vacant(e) => {
match me.actions.recv.open(id, &mut me.counts)? {
Some(stream_id) => {
let stream = Stream::new(stream_id,
me.actions.send.init_window_sz(),
me.actions.recv.init_window_sz());
Entry::Vacant(e) => match me.actions.recv.open(id, &mut me.counts)? {
Some(stream_id) => {
let stream = Stream::new(
stream_id,
me.actions.send.init_window_sz(),
me.actions.recv.init_window_sz(),
);
e.insert(stream)
}
None => return Ok(()),
}
}
e.insert(stream)
},
None => return Ok(()),
},
};
let stream = me.store.resolve(key);
let actions = &mut me.actions;
me.counts.transition(stream, |counts, stream| {
trace!("recv_headers; stream={:?}; state={:?}",
stream.id,
stream.state);
trace!(
"recv_headers; stream={:?}; state={:?}",
stream.id,
stream.state
);
let res = if stream.state.is_recv_headers() {
actions.recv.recv_headers(frame, stream, counts)
@@ -160,7 +162,7 @@ where
.map_err(RecvError::Connection)?;
return Ok(());
}
},
};
let actions = &mut me.actions;
@@ -211,11 +213,11 @@ where
// This result is ignored as there is nothing to do when there
// is an error. The stream is reset by the function on error and
// the error is informational.
let _ = me.actions
.send
.recv_stream_window_update(frame.size_increment(),
&mut stream,
&mut me.actions.task);
let _ = me.actions.send.recv_stream_window_update(
frame.size_increment(),
&mut stream,
&mut me.actions.task,
);
} else {
me.actions
.recv
@@ -255,7 +257,7 @@ where
// Return the key
Some(key)
}
},
None => None,
}
};
@@ -294,9 +296,11 @@ where
try_ready!(me.actions.recv.poll_complete(&mut me.store, dst));
// Send any other pending frames
try_ready!(me.actions
.send
.poll_complete(&mut me.store, &mut me.counts, dst));
try_ready!(me.actions.send.poll_complete(
&mut me.store,
&mut me.counts,
dst
));
// Nothing else to do, track the task
me.actions.task = Some(task::current());
@@ -335,9 +339,11 @@ where
// Initialize a new stream. This fails if the connection is at capacity.
let stream_id = me.actions.send.open(&mut me.counts)?;
let mut stream = Stream::new(stream_id,
me.actions.send.init_window_sz(),
me.actions.recv.init_window_sz());
let mut stream = Stream::new(
stream_id,
me.actions.send.init_window_sz(),
me.actions.recv.init_window_sz(),
);
if *request.method() == Method::HEAD {
stream.content_length = ContentLength::Head;
@@ -363,9 +369,9 @@ where
};
Ok(StreamRef {
inner: self.inner.clone(),
key: key,
})
inner: self.inner.clone(),
key: key,
})
}
pub fn send_reset(&mut self, id: StreamId, reason: Reason) {
@@ -374,16 +380,14 @@ where
let key = match me.store.find_entry(id) {
Entry::Occupied(e) => e.key(),
Entry::Vacant(e) => {
match me.actions.recv.open(id, &mut me.counts) {
Ok(Some(stream_id)) => {
let stream = Stream::new(stream_id, 0, 0);
Entry::Vacant(e) => match me.actions.recv.open(id, &mut me.counts) {
Ok(Some(stream_id)) => {
let stream = Stream::new(stream_id, 0, 0);
e.insert(stream)
}
_ => return,
}
}
e.insert(stream)
},
_ => return,
},
};
let stream = me.store.resolve(key);
@@ -651,8 +655,8 @@ where
res: Result<(), RecvError>,
) -> Result<(), RecvError> {
if let Err(RecvError::Stream {
reason, ..
}) = res
reason, ..
}) = res
{
// Reset the stream.
self.send.send_reset(reason, stream, &mut self.task);