Update crate to Rust 2018 (#383)
This commit is contained in:
committed by
Sean McArthur
parent
b3351e675b
commit
db6b841e67
@@ -1,12 +1,12 @@
|
||||
use {client, frame, proto, server};
|
||||
use codec::RecvError;
|
||||
use frame::{Reason, StreamId};
|
||||
use crate::{client, frame, proto, server};
|
||||
use crate::codec::RecvError;
|
||||
use crate::frame::{Reason, StreamId};
|
||||
|
||||
use frame::DEFAULT_INITIAL_WINDOW_SIZE;
|
||||
use proto::*;
|
||||
use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE;
|
||||
use crate::proto::*;
|
||||
|
||||
use bytes::{Bytes, IntoBuf};
|
||||
use futures::Stream;
|
||||
use futures::{Stream, try_ready};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use std::marker::PhantomData;
|
||||
@@ -193,7 +193,7 @@ where
|
||||
|
||||
/// Advances the internal state of the connection.
|
||||
pub fn poll(&mut self) -> Poll<(), proto::Error> {
|
||||
use codec::RecvError::*;
|
||||
use crate::codec::RecvError::*;
|
||||
|
||||
loop {
|
||||
// TODO: probably clean up this glob of code
|
||||
@@ -223,13 +223,13 @@ where
|
||||
// error. This is handled by setting a GOAWAY frame followed by
|
||||
// terminating the connection.
|
||||
Err(Connection(e)) => {
|
||||
debug!("Connection::poll; connection error={:?}", e);
|
||||
log::debug!("Connection::poll; connection error={:?}", e);
|
||||
|
||||
// We may have already sent a GOAWAY for this error,
|
||||
// if so, don't send another, just flush and close up.
|
||||
if let Some(reason) = self.go_away.going_away_reason() {
|
||||
if reason == e {
|
||||
trace!(" -> already going away");
|
||||
log::trace!(" -> already going away");
|
||||
self.state = State::Closing(e);
|
||||
continue;
|
||||
}
|
||||
@@ -246,7 +246,7 @@ where
|
||||
id,
|
||||
reason,
|
||||
}) => {
|
||||
trace!("stream error; id={:?}; reason={:?}", id, reason);
|
||||
log::trace!("stream error; id={:?}; reason={:?}", id, reason);
|
||||
self.streams.send_reset(id, reason);
|
||||
},
|
||||
// Attempting to read a frame resulted in an I/O error. All
|
||||
@@ -254,7 +254,7 @@ where
|
||||
//
|
||||
// TODO: Are I/O errors recoverable?
|
||||
Err(Io(e)) => {
|
||||
debug!("Connection::poll; IO error={:?}", e);
|
||||
log::debug!("Connection::poll; IO error={:?}", e);
|
||||
let e = e.into();
|
||||
|
||||
// Reset all active streams
|
||||
@@ -266,7 +266,7 @@ where
|
||||
}
|
||||
}
|
||||
State::Closing(reason) => {
|
||||
trace!("connection closing after flush");
|
||||
log::trace!("connection closing after flush");
|
||||
// Flush/shutdown the codec
|
||||
try_ready!(self.codec.shutdown());
|
||||
|
||||
@@ -279,7 +279,7 @@ where
|
||||
}
|
||||
|
||||
fn poll2(&mut self) -> Poll<(), RecvError> {
|
||||
use frame::Frame::*;
|
||||
use crate::frame::Frame::*;
|
||||
|
||||
// This happens outside of the loop to prevent needing to do a clock
|
||||
// check and then comparison of the queue possibly multiple times a
|
||||
@@ -309,27 +309,27 @@ where
|
||||
|
||||
match try_ready!(self.codec.poll()) {
|
||||
Some(Headers(frame)) => {
|
||||
trace!("recv HEADERS; frame={:?}", frame);
|
||||
log::trace!("recv HEADERS; frame={:?}", frame);
|
||||
self.streams.recv_headers(frame)?;
|
||||
},
|
||||
Some(Data(frame)) => {
|
||||
trace!("recv DATA; frame={:?}", frame);
|
||||
log::trace!("recv DATA; frame={:?}", frame);
|
||||
self.streams.recv_data(frame)?;
|
||||
},
|
||||
Some(Reset(frame)) => {
|
||||
trace!("recv RST_STREAM; frame={:?}", frame);
|
||||
log::trace!("recv RST_STREAM; frame={:?}", frame);
|
||||
self.streams.recv_reset(frame)?;
|
||||
},
|
||||
Some(PushPromise(frame)) => {
|
||||
trace!("recv PUSH_PROMISE; frame={:?}", frame);
|
||||
log::trace!("recv PUSH_PROMISE; frame={:?}", frame);
|
||||
self.streams.recv_push_promise(frame)?;
|
||||
},
|
||||
Some(Settings(frame)) => {
|
||||
trace!("recv SETTINGS; frame={:?}", frame);
|
||||
log::trace!("recv SETTINGS; frame={:?}", frame);
|
||||
self.settings.recv_settings(frame);
|
||||
},
|
||||
Some(GoAway(frame)) => {
|
||||
trace!("recv GOAWAY; frame={:?}", frame);
|
||||
log::trace!("recv GOAWAY; frame={:?}", frame);
|
||||
// This should prevent starting new streams,
|
||||
// but should allow continuing to process current streams
|
||||
// until they are all EOS. Once they are, State should
|
||||
@@ -338,7 +338,7 @@ where
|
||||
self.error = Some(frame.reason());
|
||||
},
|
||||
Some(Ping(frame)) => {
|
||||
trace!("recv PING; frame={:?}", frame);
|
||||
log::trace!("recv PING; frame={:?}", frame);
|
||||
let status = self.ping_pong.recv_ping(frame);
|
||||
if status.is_shutdown() {
|
||||
assert!(
|
||||
@@ -351,15 +351,15 @@ where
|
||||
}
|
||||
},
|
||||
Some(WindowUpdate(frame)) => {
|
||||
trace!("recv WINDOW_UPDATE; frame={:?}", frame);
|
||||
log::trace!("recv WINDOW_UPDATE; frame={:?}", frame);
|
||||
self.streams.recv_window_update(frame)?;
|
||||
},
|
||||
Some(Priority(frame)) => {
|
||||
trace!("recv PRIORITY; frame={:?}", frame);
|
||||
log::trace!("recv PRIORITY; frame={:?}", frame);
|
||||
// TODO: handle
|
||||
},
|
||||
None => {
|
||||
trace!("codec closed");
|
||||
log::trace!("codec closed");
|
||||
self.streams.recv_eof(false)
|
||||
.ok().expect("mutex poisoned");
|
||||
return Ok(Async::Ready(()));
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use codec::{RecvError, SendError};
|
||||
use frame::Reason;
|
||||
use crate::codec::{RecvError, SendError};
|
||||
use crate::frame::Reason;
|
||||
|
||||
use std::io;
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use codec::Codec;
|
||||
use frame::{self, Reason, StreamId};
|
||||
use crate::codec::Codec;
|
||||
use crate::frame::{self, Reason, StreamId};
|
||||
|
||||
use bytes::Buf;
|
||||
use futures::{Async, Poll};
|
||||
|
||||
@@ -13,13 +13,13 @@ pub(crate) use self::ping_pong::UserPings;
|
||||
pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams};
|
||||
pub(crate) use self::streams::{PollReset, Prioritized, Open};
|
||||
|
||||
use codec::Codec;
|
||||
use crate::codec::Codec;
|
||||
|
||||
use self::go_away::GoAway;
|
||||
use self::ping_pong::PingPong;
|
||||
use self::settings::Settings;
|
||||
|
||||
use frame::{self, Frame};
|
||||
use crate::frame::{self, Frame};
|
||||
|
||||
use futures::{task, Async, Poll};
|
||||
use futures::task::Task;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use codec::RecvError;
|
||||
use error::Reason;
|
||||
use frame::{Pseudo, StreamId};
|
||||
use proto::Open;
|
||||
use crate::codec::RecvError;
|
||||
use crate::error::Reason;
|
||||
use crate::frame::{Pseudo, StreamId};
|
||||
use crate::proto::Open;
|
||||
|
||||
use http::{HeaderMap, Request, Response};
|
||||
|
||||
@@ -12,7 +12,7 @@ pub(crate) trait Peer {
|
||||
/// Message type polled from the transport
|
||||
type Poll: fmt::Debug;
|
||||
|
||||
fn dyn() -> Dyn;
|
||||
fn r#dyn() -> Dyn;
|
||||
|
||||
fn is_server() -> bool;
|
||||
|
||||
@@ -57,10 +57,10 @@ impl Dyn {
|
||||
&self, pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId
|
||||
) -> Result<PollMessage, RecvError> {
|
||||
if self.is_server() {
|
||||
::server::Peer::convert_poll_message(pseudo, fields, stream_id)
|
||||
crate::server::Peer::convert_poll_message(pseudo, fields, stream_id)
|
||||
.map(PollMessage::Server)
|
||||
} else {
|
||||
::client::Peer::convert_poll_message(pseudo, fields, stream_id)
|
||||
crate::client::Peer::convert_poll_message(pseudo, fields, stream_id)
|
||||
.map(PollMessage::Client)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use codec::Codec;
|
||||
use frame::Ping;
|
||||
use proto::{self, PingPayload};
|
||||
use crate::codec::Codec;
|
||||
use crate::frame::Ping;
|
||||
use crate::proto::{self, PingPayload};
|
||||
|
||||
use bytes::Buf;
|
||||
use futures::{Async, Poll};
|
||||
@@ -107,7 +107,7 @@ impl PingPong {
|
||||
&Ping::SHUTDOWN,
|
||||
"pending_ping should be for shutdown",
|
||||
);
|
||||
trace!("recv PING SHUTDOWN ack");
|
||||
log::trace!("recv PING SHUTDOWN ack");
|
||||
return ReceivedPing::Shutdown;
|
||||
}
|
||||
|
||||
@@ -117,7 +117,7 @@ impl PingPong {
|
||||
|
||||
if let Some(ref users) = self.user_pings {
|
||||
if ping.payload() == &Ping::USER && users.receive_pong() {
|
||||
trace!("recv PING USER ack");
|
||||
log::trace!("recv PING USER ack");
|
||||
return ReceivedPing::Unknown;
|
||||
}
|
||||
}
|
||||
@@ -125,7 +125,7 @@ impl PingPong {
|
||||
// else we were acked a ping we didn't send?
|
||||
// The spec doesn't require us to do anything about this,
|
||||
// so for resiliency, just ignore it for now.
|
||||
warn!("recv PING ack that we never sent: {:?}", ping);
|
||||
log::warn!("recv PING ack that we never sent: {:?}", ping);
|
||||
ReceivedPing::Unknown
|
||||
} else {
|
||||
// Save the ping's payload to be sent as an acknowledgement.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use codec::RecvError;
|
||||
use frame;
|
||||
use proto::*;
|
||||
use crate::codec::RecvError;
|
||||
use crate::frame;
|
||||
use crate::proto::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Settings {
|
||||
@@ -19,7 +19,7 @@ impl Settings {
|
||||
|
||||
pub fn recv_settings(&mut self, frame: frame::Settings) {
|
||||
if frame.is_ack() {
|
||||
debug!("received remote settings ack");
|
||||
log::debug!("received remote settings ack");
|
||||
// TODO: handle acks
|
||||
} else {
|
||||
assert!(self.pending.is_none());
|
||||
@@ -38,11 +38,11 @@ impl Settings {
|
||||
C: Buf,
|
||||
P: Peer,
|
||||
{
|
||||
trace!("send_pending_ack; pending={:?}", self.pending);
|
||||
log::trace!("send_pending_ack; pending={:?}", self.pending);
|
||||
|
||||
if let Some(ref settings) = self.pending {
|
||||
if !dst.poll_ready()?.is_ready() {
|
||||
trace!("failed to send ACK");
|
||||
log::trace!("failed to send ACK");
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ impl Settings {
|
||||
.ok()
|
||||
.expect("invalid settings frame");
|
||||
|
||||
trace!("ACK sent; applying settings");
|
||||
log::trace!("ACK sent; applying settings");
|
||||
|
||||
if let Some(val) = settings.max_frame_size() {
|
||||
dst.set_max_send_frame_size(val as usize);
|
||||
|
||||
@@ -133,7 +133,7 @@ impl Counts {
|
||||
|
||||
// TODO: move this to macro?
|
||||
pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
|
||||
trace!("transition_after; stream={:?}; state={:?}; is_closed={:?}; \
|
||||
log::trace!("transition_after; stream={:?}; state={:?}; is_closed={:?}; \
|
||||
pending_send_empty={:?}; buffered_send_data={}; \
|
||||
num_recv={}; num_send={}",
|
||||
stream.id,
|
||||
@@ -153,7 +153,7 @@ impl Counts {
|
||||
}
|
||||
|
||||
if stream.is_counted {
|
||||
trace!("dec_num_streams; stream={:?}", stream.id);
|
||||
log::trace!("dec_num_streams; stream={:?}", stream.id);
|
||||
// Decrement the number of active streams.
|
||||
self.dec_num_streams(&mut stream);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use frame::Reason;
|
||||
use proto::{WindowSize, MAX_WINDOW_SIZE};
|
||||
use crate::frame::Reason;
|
||||
use crate::proto::{WindowSize, MAX_WINDOW_SIZE};
|
||||
|
||||
use std::fmt;
|
||||
|
||||
@@ -120,7 +120,7 @@ impl FlowControl {
|
||||
return Err(Reason::FLOW_CONTROL_ERROR);
|
||||
}
|
||||
|
||||
trace!(
|
||||
log::trace!(
|
||||
"inc_window; sz={}; old={}; new={}",
|
||||
sz,
|
||||
self.window_size,
|
||||
@@ -136,7 +136,7 @@ impl FlowControl {
|
||||
/// This is called after receiving a SETTINGS frame with a lower
|
||||
/// INITIAL_WINDOW_SIZE value.
|
||||
pub fn dec_window(&mut self, sz: WindowSize) {
|
||||
trace!(
|
||||
log::trace!(
|
||||
"dec_window; sz={}; window={}, available={}",
|
||||
sz,
|
||||
self.window_size,
|
||||
@@ -149,7 +149,7 @@ impl FlowControl {
|
||||
/// Decrements the window reflecting data has actually been sent. The caller
|
||||
/// must ensure that the window has capacity.
|
||||
pub fn send_data(&mut self, sz: WindowSize) {
|
||||
trace!(
|
||||
log::trace!(
|
||||
"send_data; sz={}; window={}; available={}",
|
||||
sz,
|
||||
self.window_size,
|
||||
|
||||
@@ -24,8 +24,8 @@ use self::state::State;
|
||||
use self::store::Store;
|
||||
use self::stream::Stream;
|
||||
|
||||
use frame::{StreamId, StreamIdOverflow};
|
||||
use proto::*;
|
||||
use crate::frame::{StreamId, StreamIdOverflow};
|
||||
use crate::proto::*;
|
||||
|
||||
use bytes::Bytes;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
use super::*;
|
||||
use super::store::Resolve;
|
||||
|
||||
use frame::{Reason, StreamId};
|
||||
use crate::frame::{Reason, StreamId};
|
||||
|
||||
use codec::UserError;
|
||||
use codec::UserError::*;
|
||||
use crate::codec::UserError;
|
||||
use crate::codec::UserError::*;
|
||||
|
||||
use bytes::buf::Take;
|
||||
use futures::try_ready;
|
||||
|
||||
use std::{cmp, fmt, mem};
|
||||
use std::io;
|
||||
@@ -85,7 +86,7 @@ impl Prioritize {
|
||||
|
||||
flow.assign_capacity(config.remote_init_window_sz);
|
||||
|
||||
trace!("Prioritize::new; flow={:?}", flow);
|
||||
log::trace!("Prioritize::new; flow={:?}", flow);
|
||||
|
||||
Prioritize {
|
||||
pending_send: store::Queue::new(),
|
||||
@@ -113,7 +114,7 @@ impl Prioritize {
|
||||
pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Task>) {
|
||||
// If the stream is waiting to be opened, nothing more to do.
|
||||
if !stream.is_pending_open {
|
||||
trace!("schedule_send; {:?}", stream.id);
|
||||
log::trace!("schedule_send; {:?}", stream.id);
|
||||
// Queue the stream
|
||||
self.pending_send.push(stream);
|
||||
|
||||
@@ -159,7 +160,7 @@ impl Prioritize {
|
||||
// Update the buffered data counter
|
||||
stream.buffered_send_data += sz;
|
||||
|
||||
trace!(
|
||||
log::trace!(
|
||||
"send_data; sz={}; buffered={}; requested={}",
|
||||
sz,
|
||||
stream.buffered_send_data,
|
||||
@@ -180,7 +181,7 @@ impl Prioritize {
|
||||
self.reserve_capacity(0, stream, counts);
|
||||
}
|
||||
|
||||
trace!(
|
||||
log::trace!(
|
||||
"send_data (2); available={}; buffered={}",
|
||||
stream.send_flow.available(),
|
||||
stream.buffered_send_data
|
||||
@@ -216,7 +217,7 @@ impl Prioritize {
|
||||
capacity: WindowSize,
|
||||
stream: &mut store::Ptr,
|
||||
counts: &mut Counts) {
|
||||
trace!(
|
||||
log::trace!(
|
||||
"reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
|
||||
stream.id,
|
||||
capacity,
|
||||
@@ -268,7 +269,7 @@ impl Prioritize {
|
||||
inc: WindowSize,
|
||||
stream: &mut store::Ptr,
|
||||
) -> Result<(), Reason> {
|
||||
trace!(
|
||||
log::trace!(
|
||||
"recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",
|
||||
stream.id,
|
||||
stream.state,
|
||||
@@ -328,7 +329,7 @@ impl Prioritize {
|
||||
pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
|
||||
while let Some(stream) = self.pending_capacity.pop(store) {
|
||||
counts.transition(stream, |_, stream| {
|
||||
trace!("clear_pending_capacity; stream={:?}", stream.id);
|
||||
log::trace!("clear_pending_capacity; stream={:?}", stream.id);
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -341,7 +342,7 @@ impl Prioritize {
|
||||
where
|
||||
R: Resolve,
|
||||
{
|
||||
trace!("assign_connection_capacity; inc={}", inc);
|
||||
log::trace!("assign_connection_capacity; inc={}", inc);
|
||||
|
||||
self.flow.assign_capacity(inc);
|
||||
|
||||
@@ -385,7 +386,7 @@ impl Prioritize {
|
||||
stream.send_flow.window_size() - stream.send_flow.available().as_size(),
|
||||
);
|
||||
|
||||
trace!(
|
||||
log::trace!(
|
||||
"try_assign_capacity; stream={:?}, requested={}; additional={}; buffered={}; window={}; conn={}",
|
||||
stream.id,
|
||||
total_requested,
|
||||
@@ -418,7 +419,7 @@ impl Prioritize {
|
||||
// TODO: Should prioritization factor into this?
|
||||
let assign = cmp::min(conn_available, additional);
|
||||
|
||||
trace!(
|
||||
log::trace!(
|
||||
" assigning; stream={:?}, capacity={}",
|
||||
stream.id,
|
||||
assign,
|
||||
@@ -431,7 +432,7 @@ impl Prioritize {
|
||||
self.flow.claim_capacity(assign);
|
||||
}
|
||||
|
||||
trace!(
|
||||
log::trace!(
|
||||
"try_assign_capacity(2); available={}; requested={}; buffered={}; has_unavailable={:?}",
|
||||
stream.send_flow.available(),
|
||||
stream.requested_send_capacity,
|
||||
@@ -500,14 +501,14 @@ impl Prioritize {
|
||||
// The max frame length
|
||||
let max_frame_len = dst.max_send_frame_size();
|
||||
|
||||
trace!("poll_complete");
|
||||
log::trace!("poll_complete");
|
||||
|
||||
loop {
|
||||
self.schedule_pending_open(store, counts);
|
||||
|
||||
match self.pop_frame(buffer, store, max_frame_len, counts) {
|
||||
Some(frame) => {
|
||||
trace!("writing frame={:?}", frame);
|
||||
log::trace!("writing frame={:?}", frame);
|
||||
|
||||
debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
|
||||
if let Frame::Data(ref frame) = frame {
|
||||
@@ -553,11 +554,11 @@ impl Prioritize {
|
||||
where
|
||||
B: Buf,
|
||||
{
|
||||
trace!("try reclaim frame");
|
||||
log::trace!("try reclaim frame");
|
||||
|
||||
// First check if there are any data chunks to take back
|
||||
if let Some(frame) = dst.take_last_data_frame() {
|
||||
trace!(
|
||||
log::trace!(
|
||||
" -> reclaimed; frame={:?}; sz={}",
|
||||
frame,
|
||||
frame.payload().inner.get_ref().remaining()
|
||||
@@ -569,7 +570,7 @@ impl Prioritize {
|
||||
match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
|
||||
InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
|
||||
InFlightData::Drop => {
|
||||
trace!("not reclaiming frame for cancelled stream");
|
||||
log::trace!("not reclaiming frame for cancelled stream");
|
||||
return false;
|
||||
}
|
||||
InFlightData::DataFrame(k) => {
|
||||
@@ -617,11 +618,11 @@ impl Prioritize {
|
||||
}
|
||||
|
||||
pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
|
||||
trace!("clear_queue; stream={:?}", stream.id);
|
||||
log::trace!("clear_queue; stream={:?}", stream.id);
|
||||
|
||||
// TODO: make this more efficient?
|
||||
while let Some(frame) = stream.pending_send.pop_front(buffer) {
|
||||
trace!("dropping; frame={:?}", frame);
|
||||
log::trace!("dropping; frame={:?}", frame);
|
||||
}
|
||||
|
||||
stream.buffered_send_data = 0;
|
||||
@@ -658,12 +659,12 @@ impl Prioritize {
|
||||
where
|
||||
B: Buf,
|
||||
{
|
||||
trace!("pop_frame");
|
||||
log::trace!("pop_frame");
|
||||
|
||||
loop {
|
||||
match self.pending_send.pop(store) {
|
||||
Some(mut stream) => {
|
||||
trace!("pop_frame; stream={:?}; stream.state={:?}",
|
||||
log::trace!("pop_frame; stream={:?}; stream.state={:?}",
|
||||
stream.id, stream.state);
|
||||
|
||||
// It's possible that this stream, besides having data to send,
|
||||
@@ -673,7 +674,7 @@ impl Prioritize {
|
||||
// To be safe, we just always ask the stream.
|
||||
let is_pending_reset = stream.is_pending_reset_expiration();
|
||||
|
||||
trace!(" --> stream={:?}; is_pending_reset={:?};",
|
||||
log::trace!(" --> stream={:?}; is_pending_reset={:?};",
|
||||
stream.id, is_pending_reset);
|
||||
|
||||
let frame = match stream.pending_send.pop_front(buffer) {
|
||||
@@ -683,7 +684,7 @@ impl Prioritize {
|
||||
let stream_capacity = stream.send_flow.available();
|
||||
let sz = frame.payload().remaining();
|
||||
|
||||
trace!(
|
||||
log::trace!(
|
||||
" --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \
|
||||
available={}; requested={}; buffered={};",
|
||||
frame.stream_id(),
|
||||
@@ -698,7 +699,7 @@ impl Prioritize {
|
||||
// Zero length data frames always have capacity to
|
||||
// be sent.
|
||||
if sz > 0 && stream_capacity == 0 {
|
||||
trace!(
|
||||
log::trace!(
|
||||
" --> stream capacity is 0; requested={}",
|
||||
stream.requested_send_capacity
|
||||
);
|
||||
@@ -730,10 +731,10 @@ impl Prioritize {
|
||||
// capacity at this point.
|
||||
debug_assert!(len <= self.flow.window_size());
|
||||
|
||||
trace!(" --> sending data frame; len={}", len);
|
||||
log::trace!(" --> sending data frame; len={}", len);
|
||||
|
||||
// Update the flow control
|
||||
trace!(" -- updating stream flow --");
|
||||
log::trace!(" -- updating stream flow --");
|
||||
stream.send_flow.send_data(len);
|
||||
|
||||
// Decrement the stream's buffered data counter
|
||||
@@ -746,7 +747,7 @@ impl Prioritize {
|
||||
// line.
|
||||
self.flow.assign_capacity(len);
|
||||
|
||||
trace!(" -- updating connection flow --");
|
||||
log::trace!(" -- updating connection flow --");
|
||||
self.flow.send_data(len);
|
||||
|
||||
// Wrap the frame's data payload to ensure that the
|
||||
@@ -784,7 +785,7 @@ impl Prioritize {
|
||||
// had data buffered to be sent, but all the frames are cleared
|
||||
// in clear_queue(). Instead of doing O(N) traversal through queue
|
||||
// to remove, lets just ignore the stream here.
|
||||
trace!("removing dangling stream from pending_send");
|
||||
log::trace!("removing dangling stream from pending_send");
|
||||
// Since this should only happen as a consequence of `clear_queue`,
|
||||
// we must be in a closed state of some kind.
|
||||
debug_assert!(stream.state.is_closed());
|
||||
@@ -794,7 +795,7 @@ impl Prioritize {
|
||||
}
|
||||
};
|
||||
|
||||
trace!("pop_frame; frame={:?}", frame);
|
||||
log::trace!("pop_frame; frame={:?}", frame);
|
||||
|
||||
if cfg!(debug_assertions) && stream.state.is_idle() {
|
||||
debug_assert!(stream.id > self.last_opened_id);
|
||||
@@ -819,11 +820,11 @@ impl Prioritize {
|
||||
}
|
||||
|
||||
fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
|
||||
trace!("schedule_pending_open");
|
||||
log::trace!("schedule_pending_open");
|
||||
// check for any pending open streams
|
||||
while counts.can_inc_num_send_streams() {
|
||||
if let Some(mut stream) = self.pending_open.pop(store) {
|
||||
trace!("schedule_pending_open; stream={:?}", stream.id);
|
||||
log::trace!("schedule_pending_open; stream={:?}", stream.id);
|
||||
|
||||
counts.inc_num_send_streams(&mut stream);
|
||||
self.pending_send.push(&mut stream);
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
use super::*;
|
||||
use {frame, proto};
|
||||
use codec::{RecvError, UserError};
|
||||
use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE};
|
||||
use crate::{frame, proto};
|
||||
use crate::codec::{RecvError, UserError};
|
||||
use crate::frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE};
|
||||
|
||||
use http::{HeaderMap, Response, Request, Method};
|
||||
use futures::try_ready;
|
||||
|
||||
use std::io;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -158,7 +159,7 @@ impl Recv {
|
||||
stream: &mut store::Ptr,
|
||||
counts: &mut Counts,
|
||||
) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
|
||||
trace!("opening stream; init_window={}", self.init_window_sz);
|
||||
log::trace!("opening stream; init_window={}", self.init_window_sz);
|
||||
let is_initial = stream.state.recv_open(frame.is_end_stream())?;
|
||||
|
||||
if is_initial {
|
||||
@@ -203,7 +204,7 @@ impl Recv {
|
||||
// So, if peer is a server, we'll send a 431. In either case,
|
||||
// an error is recorded, which will send a REFUSED_STREAM,
|
||||
// since we don't want any of the data frames either.
|
||||
debug!(
|
||||
log::debug!(
|
||||
"stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
|
||||
recv_headers: frame is over size; stream={:?}",
|
||||
stream.id
|
||||
@@ -340,7 +341,7 @@ impl Recv {
|
||||
capacity: WindowSize,
|
||||
task: &mut Option<Task>,
|
||||
) {
|
||||
trace!(
|
||||
log::trace!(
|
||||
"release_connection_capacity; size={}, connection in_flight_data={}",
|
||||
capacity,
|
||||
self.in_flight_data,
|
||||
@@ -366,7 +367,7 @@ impl Recv {
|
||||
stream: &mut store::Ptr,
|
||||
task: &mut Option<Task>,
|
||||
) -> Result<(), UserError> {
|
||||
trace!("release_capacity; size={}", capacity);
|
||||
log::trace!("release_capacity; size={}", capacity);
|
||||
|
||||
if capacity > stream.in_flight_recv_data {
|
||||
return Err(UserError::ReleaseCapacityTooBig);
|
||||
@@ -405,7 +406,7 @@ impl Recv {
|
||||
return;
|
||||
}
|
||||
|
||||
trace!(
|
||||
log::trace!(
|
||||
"auto-release closed stream ({:?}) capacity: {:?}",
|
||||
stream.id,
|
||||
stream.in_flight_recv_data,
|
||||
@@ -433,7 +434,7 @@ impl Recv {
|
||||
/// The `task` is an optional parked task for the `Connection` that might
|
||||
/// be blocked on needing more window capacity.
|
||||
pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Task>) {
|
||||
trace!(
|
||||
log::trace!(
|
||||
"set_target_connection_window; target={}; available={}, reserved={}",
|
||||
target,
|
||||
self.flow.available(),
|
||||
@@ -509,7 +510,7 @@ impl Recv {
|
||||
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
|
||||
}
|
||||
|
||||
trace!(
|
||||
log::trace!(
|
||||
"recv_data; size={}; connection={}; stream={}",
|
||||
sz,
|
||||
self.flow.window_size(),
|
||||
@@ -518,7 +519,7 @@ impl Recv {
|
||||
|
||||
|
||||
if is_ignoring_frame {
|
||||
trace!(
|
||||
log::trace!(
|
||||
"recv_data; frame ignored on locally reset {:?} for some time",
|
||||
stream.id,
|
||||
);
|
||||
@@ -608,7 +609,7 @@ impl Recv {
|
||||
|
||||
pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> {
|
||||
if self.flow.window_size() < sz {
|
||||
debug!(
|
||||
log::debug!(
|
||||
"connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
|
||||
self.flow.window_size(),
|
||||
sz,
|
||||
@@ -642,7 +643,7 @@ impl Recv {
|
||||
// So, if peer is a server, we'll send a 431. In either case,
|
||||
// an error is recorded, which will send a REFUSED_STREAM,
|
||||
// since we don't want any of the data frames either.
|
||||
debug!(
|
||||
log::debug!(
|
||||
"stream error REFUSED_STREAM -- recv_push_promise: \
|
||||
headers frame is over size; promised_id={:?};",
|
||||
frame.promised_id(),
|
||||
@@ -656,7 +657,7 @@ impl Recv {
|
||||
let promised_id = frame.promised_id();
|
||||
use http::header;
|
||||
let (pseudo, fields) = frame.into_parts();
|
||||
let req = ::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
|
||||
let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
|
||||
// The spec has some requirements for promised request headers
|
||||
// [https://httpwg.org/specs/rfc7540.html#PushRequests]
|
||||
|
||||
@@ -708,7 +709,7 @@ impl Recv {
|
||||
pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
|
||||
if let Ok(next) = self.next_stream_id {
|
||||
if id >= next {
|
||||
debug!("stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}", id);
|
||||
log::debug!("stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}", id);
|
||||
return Err(Reason::PROTOCOL_ERROR);
|
||||
}
|
||||
}
|
||||
@@ -803,7 +804,7 @@ impl Recv {
|
||||
return;
|
||||
}
|
||||
|
||||
trace!("enqueue_reset_expiration; {:?}", stream.id);
|
||||
log::trace!("enqueue_reset_expiration; {:?}", stream.id);
|
||||
|
||||
if !counts.can_inc_num_reset_streams() {
|
||||
// try to evict 1 stream if possible
|
||||
@@ -873,7 +874,7 @@ impl Recv {
|
||||
fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
|
||||
while let Some(stream) = self.pending_window_updates.pop(store) {
|
||||
counts.transition(stream, |_, stream| {
|
||||
trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
|
||||
log::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -962,7 +963,7 @@ impl Recv {
|
||||
};
|
||||
|
||||
counts.transition(stream, |_, stream| {
|
||||
trace!("pending_window_updates -- pop; stream={:?}", stream.id);
|
||||
log::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
|
||||
debug_assert!(!stream.is_pending_window_update);
|
||||
|
||||
if !stream.state.is_recv_streaming() {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use codec::{RecvError, UserError};
|
||||
use frame::{self, Reason};
|
||||
use crate::codec::{RecvError, UserError};
|
||||
use crate::frame::{self, Reason};
|
||||
use super::{
|
||||
store, Buffer, Codec, Config, Counts, Frame, Prioritize,
|
||||
Prioritized, Store, Stream, StreamId, StreamIdOverflow, WindowSize,
|
||||
@@ -62,7 +62,7 @@ impl Send {
|
||||
counts: &mut Counts,
|
||||
task: &mut Option<Task>,
|
||||
) -> Result<(), UserError> {
|
||||
trace!(
|
||||
log::trace!(
|
||||
"send_headers; frame={:?}; init_window={:?}",
|
||||
frame,
|
||||
self.init_window_sz
|
||||
@@ -75,11 +75,11 @@ impl Send {
|
||||
|| frame.fields().contains_key("keep-alive")
|
||||
|| frame.fields().contains_key("proxy-connection")
|
||||
{
|
||||
debug!("illegal connection-specific headers found");
|
||||
log::debug!("illegal connection-specific headers found");
|
||||
return Err(UserError::MalformedHeaders);
|
||||
} else if let Some(te) = frame.fields().get(http::header::TE) {
|
||||
if te != "trailers" {
|
||||
debug!("illegal connection-specific headers found");
|
||||
log::debug!("illegal connection-specific headers found");
|
||||
return Err(UserError::MalformedHeaders);
|
||||
|
||||
}
|
||||
@@ -121,7 +121,7 @@ impl Send {
|
||||
let is_closed = stream.state.is_closed();
|
||||
let is_empty = stream.pending_send.is_empty();
|
||||
|
||||
trace!(
|
||||
log::trace!(
|
||||
"send_reset(..., reason={:?}, stream={:?}, ..., \
|
||||
is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
|
||||
state={:?} \
|
||||
@@ -136,7 +136,7 @@ impl Send {
|
||||
|
||||
if is_reset {
|
||||
// Don't double reset
|
||||
trace!(
|
||||
log::trace!(
|
||||
" -> not sending RST_STREAM ({:?} is already reset)",
|
||||
stream.id
|
||||
);
|
||||
@@ -149,7 +149,7 @@ impl Send {
|
||||
// If closed AND the send queue is flushed, then the stream cannot be
|
||||
// reset explicitly, either. Implicit resets can still be queued.
|
||||
if is_closed && is_empty {
|
||||
trace!(
|
||||
log::trace!(
|
||||
" -> not sending explicit RST_STREAM ({:?} was closed \
|
||||
and send queue was flushed)",
|
||||
stream.id
|
||||
@@ -165,7 +165,7 @@ impl Send {
|
||||
|
||||
let frame = frame::Reset::new(stream.id, reason);
|
||||
|
||||
trace!("send_reset -- queueing; frame={:?}", frame);
|
||||
log::trace!("send_reset -- queueing; frame={:?}", frame);
|
||||
self.prioritize.queue_frame(frame.into(), buffer, stream, task);
|
||||
self.prioritize.reclaim_all_capacity(stream, counts);
|
||||
}
|
||||
@@ -220,7 +220,7 @@ impl Send {
|
||||
|
||||
stream.state.send_close();
|
||||
|
||||
trace!("send_trailers -- queuing; frame={:?}", frame);
|
||||
log::trace!("send_trailers -- queuing; frame={:?}", frame);
|
||||
self.prioritize.queue_frame(frame.into(), buffer, stream, task);
|
||||
|
||||
// Release any excess capacity
|
||||
@@ -286,7 +286,7 @@ impl Send {
|
||||
&self,
|
||||
stream: &mut Stream,
|
||||
mode: PollReset,
|
||||
) -> Poll<Reason, ::Error> {
|
||||
) -> Poll<Reason, crate::Error> {
|
||||
match stream.state.ensure_reason(mode)? {
|
||||
Some(reason) => Ok(reason.into()),
|
||||
None => {
|
||||
@@ -315,7 +315,7 @@ impl Send {
|
||||
task: &mut Option<Task>,
|
||||
) -> Result<(), Reason> {
|
||||
if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
|
||||
debug!("recv_stream_window_update !!; err={:?}", e);
|
||||
log::debug!("recv_stream_window_update !!; err={:?}", e);
|
||||
|
||||
self.send_reset(
|
||||
Reason::FLOW_CONTROL_ERROR.into(),
|
||||
@@ -370,7 +370,7 @@ impl Send {
|
||||
if val < old_val {
|
||||
// We must decrease the (remote) window on every open stream.
|
||||
let dec = old_val - val;
|
||||
trace!("decrementing all windows; dec={}", dec);
|
||||
log::trace!("decrementing all windows; dec={}", dec);
|
||||
|
||||
let mut total_reclaimed = 0;
|
||||
store.for_each(|mut stream| {
|
||||
@@ -396,7 +396,7 @@ impl Send {
|
||||
0
|
||||
};
|
||||
|
||||
trace!(
|
||||
log::trace!(
|
||||
"decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
|
||||
stream.id,
|
||||
dec,
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::io;
|
||||
|
||||
use codec::{RecvError, UserError};
|
||||
use codec::UserError::*;
|
||||
use frame::Reason;
|
||||
use proto::{self, PollReset};
|
||||
use crate::codec::{RecvError, UserError};
|
||||
use crate::codec::UserError::*;
|
||||
use crate::frame::Reason;
|
||||
use crate::proto::{self, PollReset};
|
||||
|
||||
use self::Inner::*;
|
||||
use self::Peer::*;
|
||||
@@ -203,12 +203,12 @@ impl State {
|
||||
local, ..
|
||||
} => {
|
||||
// The remote side will continue to receive data.
|
||||
trace!("recv_close: Open => HalfClosedRemote({:?})", local);
|
||||
log::trace!("recv_close: Open => HalfClosedRemote({:?})", local);
|
||||
self.inner = HalfClosedRemote(local);
|
||||
Ok(())
|
||||
},
|
||||
HalfClosedLocal(..) => {
|
||||
trace!("recv_close: HalfClosedLocal => Closed");
|
||||
log::trace!("recv_close: HalfClosedLocal => Closed");
|
||||
self.inner = Closed(Cause::EndStream);
|
||||
Ok(())
|
||||
},
|
||||
@@ -244,7 +244,7 @@ impl State {
|
||||
// previous state with the received RST_STREAM, so that the queue
|
||||
// will be cleared by `Prioritize::pop_frame`.
|
||||
state => {
|
||||
trace!(
|
||||
log::trace!(
|
||||
"recv_reset; reason={:?}; state={:?}; queued={:?}",
|
||||
reason, state, queued
|
||||
);
|
||||
@@ -256,12 +256,12 @@ impl State {
|
||||
|
||||
/// We noticed a protocol error.
|
||||
pub fn recv_err(&mut self, err: &proto::Error) {
|
||||
use proto::Error::*;
|
||||
use crate::proto::Error::*;
|
||||
|
||||
match self.inner {
|
||||
Closed(..) => {},
|
||||
_ => {
|
||||
trace!("recv_err; err={:?}", err);
|
||||
log::trace!("recv_err; err={:?}", err);
|
||||
self.inner = Closed(match *err {
|
||||
Proto(reason) => Cause::LocallyReset(reason),
|
||||
Io(..) => Cause::Io,
|
||||
@@ -274,7 +274,7 @@ impl State {
|
||||
match self.inner {
|
||||
Closed(..) => {},
|
||||
s => {
|
||||
trace!("recv_eof; state={:?}", s);
|
||||
log::trace!("recv_eof; state={:?}", s);
|
||||
self.inner = Closed(Cause::Io);
|
||||
}
|
||||
}
|
||||
@@ -287,11 +287,11 @@ impl State {
|
||||
remote, ..
|
||||
} => {
|
||||
// The remote side will continue to receive data.
|
||||
trace!("send_close: Open => HalfClosedLocal({:?})", remote);
|
||||
log::trace!("send_close: Open => HalfClosedLocal({:?})", remote);
|
||||
self.inner = HalfClosedLocal(remote);
|
||||
},
|
||||
HalfClosedRemote(..) => {
|
||||
trace!("send_close: HalfClosedRemote => Closed");
|
||||
log::trace!("send_close: HalfClosedRemote => Closed");
|
||||
self.inner = Closed(Cause::EndStream);
|
||||
},
|
||||
state => panic!("send_close: unexpected state {:?}", state),
|
||||
@@ -418,7 +418,7 @@ impl State {
|
||||
}
|
||||
|
||||
/// Returns a reason if the stream has been reset.
|
||||
pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, ::Error> {
|
||||
pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
|
||||
match self.inner {
|
||||
Closed(Cause::Proto(reason)) |
|
||||
Closed(Cause::LocallyReset(reason)) |
|
||||
|
||||
@@ -244,10 +244,10 @@ where
|
||||
///
|
||||
/// If the stream is already contained by the list, return `false`.
|
||||
pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
|
||||
trace!("Queue::push");
|
||||
log::trace!("Queue::push");
|
||||
|
||||
if N::is_queued(stream) {
|
||||
trace!(" -> already queued");
|
||||
log::trace!(" -> already queued");
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -259,7 +259,7 @@ where
|
||||
// Queue the stream
|
||||
match self.indices {
|
||||
Some(ref mut idxs) => {
|
||||
trace!(" -> existing entries");
|
||||
log::trace!(" -> existing entries");
|
||||
|
||||
// Update the current tail node to point to `stream`
|
||||
let key = stream.key();
|
||||
@@ -269,7 +269,7 @@ where
|
||||
idxs.tail = stream.key();
|
||||
},
|
||||
None => {
|
||||
trace!(" -> first entry");
|
||||
log::trace!(" -> first entry");
|
||||
self.indices = Some(store::Indices {
|
||||
head: stream.key(),
|
||||
tail: stream.key(),
|
||||
|
||||
@@ -246,12 +246,12 @@ impl Stream {
|
||||
self.send_capacity_inc = true;
|
||||
self.send_flow.assign_capacity(capacity);
|
||||
|
||||
trace!(" assigned capacity to stream; available={}; buffered={}; id={:?}",
|
||||
log::trace!(" assigned capacity to stream; available={}; buffered={}; id={:?}",
|
||||
self.send_flow.available(), self.buffered_send_data, self.id);
|
||||
|
||||
// Only notify if the capacity exceeds the amount of buffered data
|
||||
if self.send_flow.available() > self.buffered_send_data {
|
||||
trace!(" notifying task");
|
||||
log::trace!(" notifying task");
|
||||
self.notify_send();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
use {client, proto, server};
|
||||
use codec::{Codec, RecvError, SendError, UserError};
|
||||
use frame::{self, Frame, Reason};
|
||||
use proto::{peer, Peer, Open, WindowSize};
|
||||
use crate::{client, proto, server};
|
||||
use crate::codec::{Codec, RecvError, SendError, UserError};
|
||||
use crate::frame::{self, Frame, Reason};
|
||||
use crate::proto::{peer, Peer, Open, WindowSize};
|
||||
use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId};
|
||||
use super::recv::RecvHeaderBlockError;
|
||||
use super::store::{self, Entry, Resolve, Store};
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{task, Async, Poll};
|
||||
use futures::{task, Async, Poll, try_ready};
|
||||
use http::{HeaderMap, Request, Response};
|
||||
use tokio_io::AsyncWrite;
|
||||
|
||||
@@ -97,7 +97,7 @@ where
|
||||
P: Peer,
|
||||
{
|
||||
pub fn new(config: Config) -> Self {
|
||||
let peer = P::dyn();
|
||||
let peer = P::r#dyn();
|
||||
|
||||
Streams {
|
||||
inner: Arc::new(Mutex::new(Inner {
|
||||
@@ -134,7 +134,7 @@ where
|
||||
// The GOAWAY process has begun. All streams with a greater ID than
|
||||
// specified as part of GOAWAY should be ignored.
|
||||
if id > me.actions.recv.max_stream_id() {
|
||||
trace!("id ({:?}) > max_stream_id ({:?}), ignoring HEADERS", id, me.actions.recv.max_stream_id());
|
||||
log::trace!("id ({:?}) > max_stream_id ({:?}), ignoring HEADERS", id, me.actions.recv.max_stream_id());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -150,7 +150,7 @@ where
|
||||
// This may be response headers for a stream we've already
|
||||
// forgotten about...
|
||||
if me.actions.may_have_forgotten_stream::<P>(id) {
|
||||
debug!(
|
||||
log::debug!(
|
||||
"recv_headers for old stream={:?}, sending STREAM_CLOSED",
|
||||
id,
|
||||
);
|
||||
@@ -182,7 +182,7 @@ where
|
||||
// Locally reset streams must ignore frames "for some time".
|
||||
// This is because the remote may have sent trailers before
|
||||
// receiving the RST_STREAM frame.
|
||||
trace!("recv_headers; ignoring trailers on {:?}", stream.id);
|
||||
log::trace!("recv_headers; ignoring trailers on {:?}", stream.id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -191,7 +191,7 @@ where
|
||||
let send_buffer = &mut *send_buffer;
|
||||
|
||||
me.counts.transition(stream, |counts, stream| {
|
||||
trace!(
|
||||
log::trace!(
|
||||
"recv_headers; stream={:?}; state={:?}",
|
||||
stream.id,
|
||||
stream.state
|
||||
@@ -254,12 +254,12 @@ where
|
||||
// The GOAWAY process has begun. All streams with a greater ID
|
||||
// than specified as part of GOAWAY should be ignored.
|
||||
if id > me.actions.recv.max_stream_id() {
|
||||
trace!("id ({:?}) > max_stream_id ({:?}), ignoring DATA", id, me.actions.recv.max_stream_id());
|
||||
log::trace!("id ({:?}) > max_stream_id ({:?}), ignoring DATA", id, me.actions.recv.max_stream_id());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if me.actions.may_have_forgotten_stream::<P>(id) {
|
||||
debug!(
|
||||
log::debug!(
|
||||
"recv_data for old stream={:?}, sending STREAM_CLOSED",
|
||||
id,
|
||||
);
|
||||
@@ -314,7 +314,7 @@ where
|
||||
// The GOAWAY process has begun. All streams with a greater ID than
|
||||
// specified as part of GOAWAY should be ignored.
|
||||
if id > me.actions.recv.max_stream_id() {
|
||||
trace!("id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM", id, me.actions.recv.max_stream_id());
|
||||
log::trace!("id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM", id, me.actions.recv.max_stream_id());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -470,7 +470,7 @@ where
|
||||
// The GOAWAY process has begun. All streams with a greater ID
|
||||
// than specified as part of GOAWAY should be ignored.
|
||||
if id > me.actions.recv.max_stream_id() {
|
||||
trace!("id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE", id, me.actions.recv.max_stream_id());
|
||||
log::trace!("id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE", id, me.actions.recv.max_stream_id());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -549,7 +549,7 @@ where
|
||||
me.refs += 1;
|
||||
key.map(|key| {
|
||||
let stream = &mut me.store.resolve(key);
|
||||
trace!("next_incoming; id={:?}, state={:?}", stream.id, stream.state);
|
||||
log::trace!("next_incoming; id={:?}, state={:?}", stream.id, stream.state);
|
||||
StreamRef {
|
||||
opaque: OpaqueStreamRef::new(self.inner.clone(), stream),
|
||||
send_buffer: self.send_buffer.clone(),
|
||||
@@ -740,7 +740,7 @@ impl<B> Streams<B, client::Peer>
|
||||
where
|
||||
B: Buf,
|
||||
{
|
||||
pub fn poll_pending_open(&mut self, pending: Option<&OpaqueStreamRef>) -> Poll<(), ::Error> {
|
||||
pub fn poll_pending_open(&mut self, pending: Option<&OpaqueStreamRef>) -> Poll<(), crate::Error> {
|
||||
let mut me = self.inner.lock().unwrap();
|
||||
let me = &mut *me;
|
||||
|
||||
@@ -749,7 +749,7 @@ where
|
||||
|
||||
if let Some(pending) = pending {
|
||||
let mut stream = me.store.resolve(pending.key);
|
||||
trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
|
||||
log::trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
|
||||
if stream.is_pending_open {
|
||||
stream.wait_send();
|
||||
return Ok(Async::NotReady);
|
||||
@@ -779,7 +779,7 @@ where
|
||||
actions.conn_error = Some(io::Error::from(io::ErrorKind::BrokenPipe).into());
|
||||
}
|
||||
|
||||
trace!("Streams::recv_eof");
|
||||
log::trace!("Streams::recv_eof");
|
||||
|
||||
me.store
|
||||
.for_each(|stream| {
|
||||
@@ -979,7 +979,7 @@ impl<B> StreamRef<B> {
|
||||
}
|
||||
|
||||
/// Request to be notified for if a `RST_STREAM` is received for this stream.
|
||||
pub(crate) fn poll_reset(&mut self, mode: proto::PollReset) -> Poll<Reason, ::Error> {
|
||||
pub(crate) fn poll_reset(&mut self, mode: proto::PollReset) -> Poll<Reason, crate::Error> {
|
||||
let mut me = self.opaque.inner.lock().unwrap();
|
||||
let me = &mut *me;
|
||||
|
||||
@@ -1165,7 +1165,7 @@ fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
|
||||
let mut me = match inner.lock() {
|
||||
Ok(inner) => inner,
|
||||
Err(_) => if ::std::thread::panicking() {
|
||||
trace!("StreamRef::drop; mutex poisoned");
|
||||
log::trace!("StreamRef::drop; mutex poisoned");
|
||||
return;
|
||||
} else {
|
||||
panic!("StreamRef::drop; mutex poisoned");
|
||||
@@ -1176,7 +1176,7 @@ fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
|
||||
me.refs -= 1;
|
||||
let mut stream = me.store.resolve(key);
|
||||
|
||||
trace!("drop_stream_ref; stream={:?}", stream);
|
||||
log::trace!("drop_stream_ref; stream={:?}", stream);
|
||||
|
||||
// decrement the stream's ref count by 1.
|
||||
stream.ref_dec();
|
||||
|
||||
Reference in New Issue
Block a user