feat(lib): switch from log to tracing (#475)
This commit is contained in:
@@ -47,7 +47,7 @@ tokio-util = { version = "0.3.1", features = ["codec"] }
|
|||||||
tokio = { version = "0.2", features = ["io-util"] }
|
tokio = { version = "0.2", features = ["io-util"] }
|
||||||
bytes = "0.5.2"
|
bytes = "0.5.2"
|
||||||
http = "0.2"
|
http = "0.2"
|
||||||
log = "0.4.1"
|
tracing = { version = "0.1.13", default-features = false, features = ["std", "log"] }
|
||||||
fnv = "1.0.5"
|
fnv = "1.0.5"
|
||||||
slab = "0.4.0"
|
slab = "0.4.0"
|
||||||
indexmap = "1.0"
|
indexmap = "1.0"
|
||||||
|
|||||||
@@ -1129,12 +1129,12 @@ where
|
|||||||
mut io: T,
|
mut io: T,
|
||||||
builder: Builder,
|
builder: Builder,
|
||||||
) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
|
) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
|
||||||
log::debug!("binding client connection");
|
tracing::debug!("binding client connection");
|
||||||
|
|
||||||
let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
|
let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
|
||||||
io.write_all(msg).await.map_err(crate::Error::from_io)?;
|
io.write_all(msg).await.map_err(crate::Error::from_io)?;
|
||||||
|
|
||||||
log::debug!("client connection bound");
|
tracing::debug!("client connection bound");
|
||||||
|
|
||||||
// Create the codec
|
// Create the codec
|
||||||
let mut codec = Codec::new(io);
|
let mut codec = Codec::new(io);
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ impl<T> FramedRead<T> {
|
|||||||
fn decode_frame(&mut self, mut bytes: BytesMut) -> Result<Option<Frame>, RecvError> {
|
fn decode_frame(&mut self, mut bytes: BytesMut) -> Result<Option<Frame>, RecvError> {
|
||||||
use self::RecvError::*;
|
use self::RecvError::*;
|
||||||
|
|
||||||
log::trace!("decoding frame from {}B", bytes.len());
|
tracing::trace!("decoding frame from {}B", bytes.len());
|
||||||
|
|
||||||
// Parse the head
|
// Parse the head
|
||||||
let head = frame::Head::parse(&bytes);
|
let head = frame::Head::parse(&bytes);
|
||||||
@@ -74,7 +74,7 @@ impl<T> FramedRead<T> {
|
|||||||
|
|
||||||
let kind = head.kind();
|
let kind = head.kind();
|
||||||
|
|
||||||
log::trace!(" -> kind={:?}", kind);
|
tracing::trace!(" -> kind={:?}", kind);
|
||||||
|
|
||||||
macro_rules! header_block {
|
macro_rules! header_block {
|
||||||
($frame:ident, $head:ident, $bytes:ident) => ({
|
($frame:ident, $head:ident, $bytes:ident) => ({
|
||||||
@@ -124,7 +124,7 @@ impl<T> FramedRead<T> {
|
|||||||
if is_end_headers {
|
if is_end_headers {
|
||||||
frame.into()
|
frame.into()
|
||||||
} else {
|
} else {
|
||||||
log::trace!("loaded partial header block");
|
tracing::trace!("loaded partial header block");
|
||||||
// Defer returning the frame
|
// Defer returning the frame
|
||||||
self.partial = Some(Partial {
|
self.partial = Some(Partial {
|
||||||
frame: Continuable::$frame(frame),
|
frame: Continuable::$frame(frame),
|
||||||
@@ -339,16 +339,16 @@ where
|
|||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
loop {
|
loop {
|
||||||
log::trace!("poll");
|
tracing::trace!("poll");
|
||||||
let bytes = match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
|
let bytes = match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
|
||||||
Some(Ok(bytes)) => bytes,
|
Some(Ok(bytes)) => bytes,
|
||||||
Some(Err(e)) => return Poll::Ready(Some(Err(map_err(e)))),
|
Some(Err(e)) => return Poll::Ready(Some(Err(map_err(e)))),
|
||||||
None => return Poll::Ready(None),
|
None => return Poll::Ready(None),
|
||||||
};
|
};
|
||||||
|
|
||||||
log::trace!("poll; bytes={}B", bytes.len());
|
tracing::trace!("poll; bytes={}B", bytes.len());
|
||||||
if let Some(frame) = self.decode_frame(bytes)? {
|
if let Some(frame) = self.decode_frame(bytes)? {
|
||||||
log::debug!("received; frame={:?}", frame);
|
tracing::debug!("received; frame={:?}", frame);
|
||||||
return Poll::Ready(Some(Ok(frame)));
|
return Poll::Ready(Some(Ok(frame)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ where
|
|||||||
// Ensure that we have enough capacity to accept the write.
|
// Ensure that we have enough capacity to accept the write.
|
||||||
assert!(self.has_capacity());
|
assert!(self.has_capacity());
|
||||||
|
|
||||||
log::debug!("send; frame={:?}", item);
|
tracing::debug!("send; frame={:?}", item);
|
||||||
|
|
||||||
match item {
|
match item {
|
||||||
Frame::Data(mut v) => {
|
Frame::Data(mut v) => {
|
||||||
@@ -150,31 +150,31 @@ where
|
|||||||
}
|
}
|
||||||
Frame::Settings(v) => {
|
Frame::Settings(v) => {
|
||||||
v.encode(self.buf.get_mut());
|
v.encode(self.buf.get_mut());
|
||||||
log::trace!("encoded settings; rem={:?}", self.buf.remaining());
|
tracing::trace!("encoded settings; rem={:?}", self.buf.remaining());
|
||||||
}
|
}
|
||||||
Frame::GoAway(v) => {
|
Frame::GoAway(v) => {
|
||||||
v.encode(self.buf.get_mut());
|
v.encode(self.buf.get_mut());
|
||||||
log::trace!("encoded go_away; rem={:?}", self.buf.remaining());
|
tracing::trace!("encoded go_away; rem={:?}", self.buf.remaining());
|
||||||
}
|
}
|
||||||
Frame::Ping(v) => {
|
Frame::Ping(v) => {
|
||||||
v.encode(self.buf.get_mut());
|
v.encode(self.buf.get_mut());
|
||||||
log::trace!("encoded ping; rem={:?}", self.buf.remaining());
|
tracing::trace!("encoded ping; rem={:?}", self.buf.remaining());
|
||||||
}
|
}
|
||||||
Frame::WindowUpdate(v) => {
|
Frame::WindowUpdate(v) => {
|
||||||
v.encode(self.buf.get_mut());
|
v.encode(self.buf.get_mut());
|
||||||
log::trace!("encoded window_update; rem={:?}", self.buf.remaining());
|
tracing::trace!("encoded window_update; rem={:?}", self.buf.remaining());
|
||||||
}
|
}
|
||||||
|
|
||||||
Frame::Priority(_) => {
|
Frame::Priority(_) => {
|
||||||
/*
|
/*
|
||||||
v.encode(self.buf.get_mut());
|
v.encode(self.buf.get_mut());
|
||||||
log::trace!("encoded priority; rem={:?}", self.buf.remaining());
|
tracing::trace!("encoded priority; rem={:?}", self.buf.remaining());
|
||||||
*/
|
*/
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
Frame::Reset(v) => {
|
Frame::Reset(v) => {
|
||||||
v.encode(self.buf.get_mut());
|
v.encode(self.buf.get_mut());
|
||||||
log::trace!("encoded reset; rem={:?}", self.buf.remaining());
|
tracing::trace!("encoded reset; rem={:?}", self.buf.remaining());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -183,18 +183,18 @@ where
|
|||||||
|
|
||||||
/// Flush buffered data to the wire
|
/// Flush buffered data to the wire
|
||||||
pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
|
pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
|
||||||
log::trace!("flush");
|
tracing::trace!("flush");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
while !self.is_empty() {
|
while !self.is_empty() {
|
||||||
match self.next {
|
match self.next {
|
||||||
Some(Next::Data(ref mut frame)) => {
|
Some(Next::Data(ref mut frame)) => {
|
||||||
log::trace!(" -> queued data frame");
|
tracing::trace!(" -> queued data frame");
|
||||||
let mut buf = (&mut self.buf).chain(frame.payload_mut());
|
let mut buf = (&mut self.buf).chain(frame.payload_mut());
|
||||||
ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut buf))?;
|
ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut buf))?;
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
log::trace!(" -> not a queued data frame");
|
tracing::trace!(" -> not a queued data frame");
|
||||||
ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut self.buf))?;
|
ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut self.buf))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -234,7 +234,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log::trace!("flushing buffer");
|
tracing::trace!("flushing buffer");
|
||||||
// Flush the upstream
|
// Flush the upstream
|
||||||
ready!(Pin::new(&mut self.inner).poll_flush(cx))?;
|
ready!(Pin::new(&mut self.inner).poll_flush(cx))?;
|
||||||
|
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ impl GoAway {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn encode<B: BufMut>(&self, dst: &mut B) {
|
pub fn encode<B: BufMut>(&self, dst: &mut B) {
|
||||||
log::trace!("encoding GO_AWAY; code={:?}", self.error_code);
|
tracing::trace!("encoding GO_AWAY; code={:?}", self.error_code);
|
||||||
let head = Head::new(Kind::GoAway, 0, StreamId::zero());
|
let head = Head::new(Kind::GoAway, 0, StreamId::zero());
|
||||||
head.encode(8, dst);
|
head.encode(8, dst);
|
||||||
dst.put_u32(self.last_stream_id.into());
|
dst.put_u32(self.last_stream_id.into());
|
||||||
|
|||||||
@@ -153,7 +153,7 @@ impl Headers {
|
|||||||
let flags = HeadersFlag(head.flag());
|
let flags = HeadersFlag(head.flag());
|
||||||
let mut pad = 0;
|
let mut pad = 0;
|
||||||
|
|
||||||
log::trace!("loading headers; flags={:?}", flags);
|
tracing::trace!("loading headers; flags={:?}", flags);
|
||||||
|
|
||||||
// Read the padding length
|
// Read the padding length
|
||||||
if flags.is_padded() {
|
if flags.is_padded() {
|
||||||
@@ -817,10 +817,10 @@ impl HeaderBlock {
|
|||||||
macro_rules! set_pseudo {
|
macro_rules! set_pseudo {
|
||||||
($field:ident, $val:expr) => {{
|
($field:ident, $val:expr) => {{
|
||||||
if reg {
|
if reg {
|
||||||
log::trace!("load_hpack; header malformed -- pseudo not at head of block");
|
tracing::trace!("load_hpack; header malformed -- pseudo not at head of block");
|
||||||
malformed = true;
|
malformed = true;
|
||||||
} else if self.pseudo.$field.is_some() {
|
} else if self.pseudo.$field.is_some() {
|
||||||
log::trace!("load_hpack; header malformed -- repeated pseudo");
|
tracing::trace!("load_hpack; header malformed -- repeated pseudo");
|
||||||
malformed = true;
|
malformed = true;
|
||||||
} else {
|
} else {
|
||||||
let __val = $val;
|
let __val = $val;
|
||||||
@@ -829,7 +829,7 @@ impl HeaderBlock {
|
|||||||
if headers_size < max_header_list_size {
|
if headers_size < max_header_list_size {
|
||||||
self.pseudo.$field = Some(__val);
|
self.pseudo.$field = Some(__val);
|
||||||
} else if !self.is_over_size {
|
} else if !self.is_over_size {
|
||||||
log::trace!("load_hpack; header list size over max");
|
tracing::trace!("load_hpack; header list size over max");
|
||||||
self.is_over_size = true;
|
self.is_over_size = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -856,10 +856,13 @@ impl HeaderBlock {
|
|||||||
|| name == "keep-alive"
|
|| name == "keep-alive"
|
||||||
|| name == "proxy-connection"
|
|| name == "proxy-connection"
|
||||||
{
|
{
|
||||||
log::trace!("load_hpack; connection level header");
|
tracing::trace!("load_hpack; connection level header");
|
||||||
malformed = true;
|
malformed = true;
|
||||||
} else if name == header::TE && value != "trailers" {
|
} else if name == header::TE && value != "trailers" {
|
||||||
log::trace!("load_hpack; TE header not set to trailers; val={:?}", value);
|
tracing::trace!(
|
||||||
|
"load_hpack; TE header not set to trailers; val={:?}",
|
||||||
|
value
|
||||||
|
);
|
||||||
malformed = true;
|
malformed = true;
|
||||||
} else {
|
} else {
|
||||||
reg = true;
|
reg = true;
|
||||||
@@ -868,7 +871,7 @@ impl HeaderBlock {
|
|||||||
if headers_size < max_header_list_size {
|
if headers_size < max_header_list_size {
|
||||||
self.fields.append(name, value);
|
self.fields.append(name, value);
|
||||||
} else if !self.is_over_size {
|
} else if !self.is_over_size {
|
||||||
log::trace!("load_hpack; header list size over max");
|
tracing::trace!("load_hpack; header list size over max");
|
||||||
self.is_over_size = true;
|
self.is_over_size = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -882,12 +885,12 @@ impl HeaderBlock {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
log::trace!("hpack decoding error; err={:?}", e);
|
tracing::trace!("hpack decoding error; err={:?}", e);
|
||||||
return Err(e.into());
|
return Err(e.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
if malformed {
|
if malformed {
|
||||||
log::trace!("malformed message");
|
tracing::trace!("malformed message");
|
||||||
return Err(Error::MalformedMessage);
|
return Err(Error::MalformedMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ impl Ping {
|
|||||||
|
|
||||||
pub fn encode<B: BufMut>(&self, dst: &mut B) {
|
pub fn encode<B: BufMut>(&self, dst: &mut B) {
|
||||||
let sz = self.payload.len();
|
let sz = self.payload.len();
|
||||||
log::trace!("encoding PING; ack={} len={}", self.ack, sz);
|
tracing::trace!("encoding PING; ack={} len={}", self.ack, sz);
|
||||||
|
|
||||||
let flags = if self.ack { ACK_FLAG } else { 0 };
|
let flags = if self.ack { ACK_FLAG } else { 0 };
|
||||||
let head = Head::new(Kind::Ping, flags, StreamId::zero());
|
let head = Head::new(Kind::Ping, flags, StreamId::zero());
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ impl Reset {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn encode<B: BufMut>(&self, dst: &mut B) {
|
pub fn encode<B: BufMut>(&self, dst: &mut B) {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"encoding RESET; id={:?} code={:?}",
|
"encoding RESET; id={:?} code={:?}",
|
||||||
self.stream_id,
|
self.stream_id,
|
||||||
self.error_code
|
self.error_code
|
||||||
|
|||||||
@@ -141,7 +141,7 @@ impl Settings {
|
|||||||
|
|
||||||
// Ensure the payload length is correct, each setting is 6 bytes long.
|
// Ensure the payload length is correct, each setting is 6 bytes long.
|
||||||
if payload.len() % 6 != 0 {
|
if payload.len() % 6 != 0 {
|
||||||
log::debug!("invalid settings payload length; len={:?}", payload.len());
|
tracing::debug!("invalid settings payload length; len={:?}", payload.len());
|
||||||
return Err(Error::InvalidPayloadAckSettings);
|
return Err(Error::InvalidPayloadAckSettings);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -199,13 +199,13 @@ impl Settings {
|
|||||||
let head = Head::new(Kind::Settings, self.flags.into(), StreamId::zero());
|
let head = Head::new(Kind::Settings, self.flags.into(), StreamId::zero());
|
||||||
let payload_len = self.payload_len();
|
let payload_len = self.payload_len();
|
||||||
|
|
||||||
log::trace!("encoding SETTINGS; len={}", payload_len);
|
tracing::trace!("encoding SETTINGS; len={}", payload_len);
|
||||||
|
|
||||||
head.encode(payload_len, dst);
|
head.encode(payload_len, dst);
|
||||||
|
|
||||||
// Encode the settings
|
// Encode the settings
|
||||||
self.for_each(|setting| {
|
self.for_each(|setting| {
|
||||||
log::trace!("encoding setting; val={:?}", setting);
|
tracing::trace!("encoding setting; val={:?}", setting);
|
||||||
setting.encode(dst)
|
setting.encode(dst)
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ impl WindowUpdate {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn encode<B: BufMut>(&self, dst: &mut B) {
|
pub fn encode<B: BufMut>(&self, dst: &mut B) {
|
||||||
log::trace!("encoding WINDOW_UPDATE; id={:?}", self.stream_id);
|
tracing::trace!("encoding WINDOW_UPDATE; id={:?}", self.stream_id);
|
||||||
let head = Head::new(Kind::WindowUpdate, 0, self.stream_id);
|
let head = Head::new(Kind::WindowUpdate, 0, self.stream_id);
|
||||||
head.encode(4, dst);
|
head.encode(4, dst);
|
||||||
dst.put_u32(self.size_increment);
|
dst.put_u32(self.size_increment);
|
||||||
|
|||||||
@@ -183,7 +183,7 @@ impl Decoder {
|
|||||||
self.last_max_update = size;
|
self.last_max_update = size;
|
||||||
}
|
}
|
||||||
|
|
||||||
log::trace!("decode");
|
tracing::trace!("decode");
|
||||||
|
|
||||||
while let Some(ty) = peek_u8(src) {
|
while let Some(ty) = peek_u8(src) {
|
||||||
// At this point we are always at the beginning of the next block
|
// At this point we are always at the beginning of the next block
|
||||||
@@ -191,14 +191,14 @@ impl Decoder {
|
|||||||
// determined from the first byte.
|
// determined from the first byte.
|
||||||
match Representation::load(ty)? {
|
match Representation::load(ty)? {
|
||||||
Indexed => {
|
Indexed => {
|
||||||
log::trace!(" Indexed; rem={:?}", src.remaining());
|
tracing::trace!(" Indexed; rem={:?}", src.remaining());
|
||||||
can_resize = false;
|
can_resize = false;
|
||||||
let entry = self.decode_indexed(src)?;
|
let entry = self.decode_indexed(src)?;
|
||||||
consume(src);
|
consume(src);
|
||||||
f(entry);
|
f(entry);
|
||||||
}
|
}
|
||||||
LiteralWithIndexing => {
|
LiteralWithIndexing => {
|
||||||
log::trace!(" LiteralWithIndexing; rem={:?}", src.remaining());
|
tracing::trace!(" LiteralWithIndexing; rem={:?}", src.remaining());
|
||||||
can_resize = false;
|
can_resize = false;
|
||||||
let entry = self.decode_literal(src, true)?;
|
let entry = self.decode_literal(src, true)?;
|
||||||
|
|
||||||
@@ -209,14 +209,14 @@ impl Decoder {
|
|||||||
f(entry);
|
f(entry);
|
||||||
}
|
}
|
||||||
LiteralWithoutIndexing => {
|
LiteralWithoutIndexing => {
|
||||||
log::trace!(" LiteralWithoutIndexing; rem={:?}", src.remaining());
|
tracing::trace!(" LiteralWithoutIndexing; rem={:?}", src.remaining());
|
||||||
can_resize = false;
|
can_resize = false;
|
||||||
let entry = self.decode_literal(src, false)?;
|
let entry = self.decode_literal(src, false)?;
|
||||||
consume(src);
|
consume(src);
|
||||||
f(entry);
|
f(entry);
|
||||||
}
|
}
|
||||||
LiteralNeverIndexed => {
|
LiteralNeverIndexed => {
|
||||||
log::trace!(" LiteralNeverIndexed; rem={:?}", src.remaining());
|
tracing::trace!(" LiteralNeverIndexed; rem={:?}", src.remaining());
|
||||||
can_resize = false;
|
can_resize = false;
|
||||||
let entry = self.decode_literal(src, false)?;
|
let entry = self.decode_literal(src, false)?;
|
||||||
consume(src);
|
consume(src);
|
||||||
@@ -226,7 +226,7 @@ impl Decoder {
|
|||||||
f(entry);
|
f(entry);
|
||||||
}
|
}
|
||||||
SizeUpdate => {
|
SizeUpdate => {
|
||||||
log::trace!(" SizeUpdate; rem={:?}", src.remaining());
|
tracing::trace!(" SizeUpdate; rem={:?}", src.remaining());
|
||||||
if !can_resize {
|
if !can_resize {
|
||||||
return Err(DecoderError::InvalidMaxDynamicSize);
|
return Err(DecoderError::InvalidMaxDynamicSize);
|
||||||
}
|
}
|
||||||
@@ -248,7 +248,7 @@ impl Decoder {
|
|||||||
return Err(DecoderError::InvalidMaxDynamicSize);
|
return Err(DecoderError::InvalidMaxDynamicSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
log::debug!(
|
tracing::debug!(
|
||||||
"Decoder changed max table size from {} to {}",
|
"Decoder changed max table size from {} to {}",
|
||||||
self.table.size(),
|
self.table.size(),
|
||||||
new_size
|
new_size
|
||||||
@@ -302,7 +302,7 @@ impl Decoder {
|
|||||||
let len = decode_int(buf, 7)?;
|
let len = decode_int(buf, 7)?;
|
||||||
|
|
||||||
if len > buf.remaining() {
|
if len > buf.remaining() {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"decode_string underflow; len={}; remaining={}",
|
"decode_string underflow; len={}; remaining={}",
|
||||||
len,
|
len,
|
||||||
buf.remaining()
|
buf.remaining()
|
||||||
|
|||||||
@@ -84,10 +84,10 @@
|
|||||||
|
|
||||||
macro_rules! proto_err {
|
macro_rules! proto_err {
|
||||||
(conn: $($msg:tt)+) => {
|
(conn: $($msg:tt)+) => {
|
||||||
log::debug!("connection error PROTOCOL_ERROR -- {};", format_args!($($msg)+))
|
tracing::debug!("connection error PROTOCOL_ERROR -- {};", format_args!($($msg)+))
|
||||||
};
|
};
|
||||||
(stream: $($msg:tt)+) => {
|
(stream: $($msg:tt)+) => {
|
||||||
log::debug!("stream error PROTOCOL_ERROR -- {};", format_args!($($msg)+))
|
tracing::debug!("stream error PROTOCOL_ERROR -- {};", format_args!($($msg)+))
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -230,13 +230,13 @@ where
|
|||||||
// error. This is handled by setting a GOAWAY frame followed by
|
// error. This is handled by setting a GOAWAY frame followed by
|
||||||
// terminating the connection.
|
// terminating the connection.
|
||||||
Poll::Ready(Err(Connection(e))) => {
|
Poll::Ready(Err(Connection(e))) => {
|
||||||
log::debug!("Connection::poll; connection error={:?}", e);
|
tracing::debug!("Connection::poll; connection error={:?}", e);
|
||||||
|
|
||||||
// We may have already sent a GOAWAY for this error,
|
// We may have already sent a GOAWAY for this error,
|
||||||
// if so, don't send another, just flush and close up.
|
// if so, don't send another, just flush and close up.
|
||||||
if let Some(reason) = self.go_away.going_away_reason() {
|
if let Some(reason) = self.go_away.going_away_reason() {
|
||||||
if reason == e {
|
if reason == e {
|
||||||
log::trace!(" -> already going away");
|
tracing::trace!(" -> already going away");
|
||||||
self.state = State::Closing(e);
|
self.state = State::Closing(e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -250,7 +250,7 @@ where
|
|||||||
// This is handled by resetting the frame then trying to read
|
// This is handled by resetting the frame then trying to read
|
||||||
// another frame.
|
// another frame.
|
||||||
Poll::Ready(Err(Stream { id, reason })) => {
|
Poll::Ready(Err(Stream { id, reason })) => {
|
||||||
log::trace!("stream error; id={:?}; reason={:?}", id, reason);
|
tracing::trace!("stream error; id={:?}; reason={:?}", id, reason);
|
||||||
self.streams.send_reset(id, reason);
|
self.streams.send_reset(id, reason);
|
||||||
}
|
}
|
||||||
// Attempting to read a frame resulted in an I/O error. All
|
// Attempting to read a frame resulted in an I/O error. All
|
||||||
@@ -258,7 +258,7 @@ where
|
|||||||
//
|
//
|
||||||
// TODO: Are I/O errors recoverable?
|
// TODO: Are I/O errors recoverable?
|
||||||
Poll::Ready(Err(Io(e))) => {
|
Poll::Ready(Err(Io(e))) => {
|
||||||
log::debug!("Connection::poll; IO error={:?}", e);
|
tracing::debug!("Connection::poll; IO error={:?}", e);
|
||||||
let e = e.into();
|
let e = e.into();
|
||||||
|
|
||||||
// Reset all active streams
|
// Reset all active streams
|
||||||
@@ -270,7 +270,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
State::Closing(reason) => {
|
State::Closing(reason) => {
|
||||||
log::trace!("connection closing after flush");
|
tracing::trace!("connection closing after flush");
|
||||||
// Flush/shutdown the codec
|
// Flush/shutdown the codec
|
||||||
ready!(self.codec.shutdown(cx))?;
|
ready!(self.codec.shutdown(cx))?;
|
||||||
|
|
||||||
@@ -317,28 +317,28 @@ where
|
|||||||
|
|
||||||
match ready!(Pin::new(&mut self.codec).poll_next(cx)?) {
|
match ready!(Pin::new(&mut self.codec).poll_next(cx)?) {
|
||||||
Some(Headers(frame)) => {
|
Some(Headers(frame)) => {
|
||||||
log::trace!("recv HEADERS; frame={:?}", frame);
|
tracing::trace!("recv HEADERS; frame={:?}", frame);
|
||||||
self.streams.recv_headers(frame)?;
|
self.streams.recv_headers(frame)?;
|
||||||
}
|
}
|
||||||
Some(Data(frame)) => {
|
Some(Data(frame)) => {
|
||||||
log::trace!("recv DATA; frame={:?}", frame);
|
tracing::trace!("recv DATA; frame={:?}", frame);
|
||||||
self.streams.recv_data(frame)?;
|
self.streams.recv_data(frame)?;
|
||||||
}
|
}
|
||||||
Some(Reset(frame)) => {
|
Some(Reset(frame)) => {
|
||||||
log::trace!("recv RST_STREAM; frame={:?}", frame);
|
tracing::trace!("recv RST_STREAM; frame={:?}", frame);
|
||||||
self.streams.recv_reset(frame)?;
|
self.streams.recv_reset(frame)?;
|
||||||
}
|
}
|
||||||
Some(PushPromise(frame)) => {
|
Some(PushPromise(frame)) => {
|
||||||
log::trace!("recv PUSH_PROMISE; frame={:?}", frame);
|
tracing::trace!("recv PUSH_PROMISE; frame={:?}", frame);
|
||||||
self.streams.recv_push_promise(frame)?;
|
self.streams.recv_push_promise(frame)?;
|
||||||
}
|
}
|
||||||
Some(Settings(frame)) => {
|
Some(Settings(frame)) => {
|
||||||
log::trace!("recv SETTINGS; frame={:?}", frame);
|
tracing::trace!("recv SETTINGS; frame={:?}", frame);
|
||||||
self.settings
|
self.settings
|
||||||
.recv_settings(frame, &mut self.codec, &mut self.streams)?;
|
.recv_settings(frame, &mut self.codec, &mut self.streams)?;
|
||||||
}
|
}
|
||||||
Some(GoAway(frame)) => {
|
Some(GoAway(frame)) => {
|
||||||
log::trace!("recv GOAWAY; frame={:?}", frame);
|
tracing::trace!("recv GOAWAY; frame={:?}", frame);
|
||||||
// This should prevent starting new streams,
|
// This should prevent starting new streams,
|
||||||
// but should allow continuing to process current streams
|
// but should allow continuing to process current streams
|
||||||
// until they are all EOS. Once they are, State should
|
// until they are all EOS. Once they are, State should
|
||||||
@@ -347,7 +347,7 @@ where
|
|||||||
self.error = Some(frame.reason());
|
self.error = Some(frame.reason());
|
||||||
}
|
}
|
||||||
Some(Ping(frame)) => {
|
Some(Ping(frame)) => {
|
||||||
log::trace!("recv PING; frame={:?}", frame);
|
tracing::trace!("recv PING; frame={:?}", frame);
|
||||||
let status = self.ping_pong.recv_ping(frame);
|
let status = self.ping_pong.recv_ping(frame);
|
||||||
if status.is_shutdown() {
|
if status.is_shutdown() {
|
||||||
assert!(
|
assert!(
|
||||||
@@ -360,15 +360,15 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(WindowUpdate(frame)) => {
|
Some(WindowUpdate(frame)) => {
|
||||||
log::trace!("recv WINDOW_UPDATE; frame={:?}", frame);
|
tracing::trace!("recv WINDOW_UPDATE; frame={:?}", frame);
|
||||||
self.streams.recv_window_update(frame)?;
|
self.streams.recv_window_update(frame)?;
|
||||||
}
|
}
|
||||||
Some(Priority(frame)) => {
|
Some(Priority(frame)) => {
|
||||||
log::trace!("recv PRIORITY; frame={:?}", frame);
|
tracing::trace!("recv PRIORITY; frame={:?}", frame);
|
||||||
// TODO: handle
|
// TODO: handle
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
log::trace!("codec closed");
|
tracing::trace!("codec closed");
|
||||||
self.streams.recv_eof(false).expect("mutex poisoned");
|
self.streams.recv_eof(false).expect("mutex poisoned");
|
||||||
return Poll::Ready(Ok(()));
|
return Poll::Ready(Ok(()));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ impl PingPong {
|
|||||||
&Ping::SHUTDOWN,
|
&Ping::SHUTDOWN,
|
||||||
"pending_ping should be for shutdown",
|
"pending_ping should be for shutdown",
|
||||||
);
|
);
|
||||||
log::trace!("recv PING SHUTDOWN ack");
|
tracing::trace!("recv PING SHUTDOWN ack");
|
||||||
return ReceivedPing::Shutdown;
|
return ReceivedPing::Shutdown;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -117,7 +117,7 @@ impl PingPong {
|
|||||||
|
|
||||||
if let Some(ref users) = self.user_pings {
|
if let Some(ref users) = self.user_pings {
|
||||||
if ping.payload() == &Ping::USER && users.receive_pong() {
|
if ping.payload() == &Ping::USER && users.receive_pong() {
|
||||||
log::trace!("recv PING USER ack");
|
tracing::trace!("recv PING USER ack");
|
||||||
return ReceivedPing::Unknown;
|
return ReceivedPing::Unknown;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -125,7 +125,7 @@ impl PingPong {
|
|||||||
// else we were acked a ping we didn't send?
|
// else we were acked a ping we didn't send?
|
||||||
// The spec doesn't require us to do anything about this,
|
// The spec doesn't require us to do anything about this,
|
||||||
// so for resiliency, just ignore it for now.
|
// so for resiliency, just ignore it for now.
|
||||||
log::warn!("recv PING ack that we never sent: {:?}", ping);
|
tracing::warn!("recv PING ack that we never sent: {:?}", ping);
|
||||||
ReceivedPing::Unknown
|
ReceivedPing::Unknown
|
||||||
} else {
|
} else {
|
||||||
// Save the ping's payload to be sent as an acknowledgement.
|
// Save the ping's payload to be sent as an acknowledgement.
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ impl Settings {
|
|||||||
if frame.is_ack() {
|
if frame.is_ack() {
|
||||||
match &self.local {
|
match &self.local {
|
||||||
Local::WaitingAck(local) => {
|
Local::WaitingAck(local) => {
|
||||||
log::debug!("received settings ACK; applying {:?}", local);
|
tracing::debug!("received settings ACK; applying {:?}", local);
|
||||||
|
|
||||||
if let Some(max) = local.max_frame_size() {
|
if let Some(max) = local.max_frame_size() {
|
||||||
codec.set_max_recv_frame_size(max as usize);
|
codec.set_max_recv_frame_size(max as usize);
|
||||||
@@ -85,7 +85,7 @@ impl Settings {
|
|||||||
match &self.local {
|
match &self.local {
|
||||||
Local::ToSend(..) | Local::WaitingAck(..) => Err(UserError::SendSettingsWhilePending),
|
Local::ToSend(..) | Local::WaitingAck(..) => Err(UserError::SendSettingsWhilePending),
|
||||||
Local::Synced => {
|
Local::Synced => {
|
||||||
log::trace!("queue to send local settings: {:?}", frame);
|
tracing::trace!("queue to send local settings: {:?}", frame);
|
||||||
self.local = Local::ToSend(frame);
|
self.local = Local::ToSend(frame);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -115,7 +115,7 @@ impl Settings {
|
|||||||
// Buffer the settings frame
|
// Buffer the settings frame
|
||||||
dst.buffer(frame.into()).expect("invalid settings frame");
|
dst.buffer(frame.into()).expect("invalid settings frame");
|
||||||
|
|
||||||
log::trace!("ACK sent; applying settings");
|
tracing::trace!("ACK sent; applying settings");
|
||||||
|
|
||||||
if let Some(val) = settings.header_table_size() {
|
if let Some(val) = settings.header_table_size() {
|
||||||
dst.set_send_header_table_size(val as usize);
|
dst.set_send_header_table_size(val as usize);
|
||||||
@@ -139,7 +139,7 @@ impl Settings {
|
|||||||
// Buffer the settings frame
|
// Buffer the settings frame
|
||||||
dst.buffer(settings.clone().into())
|
dst.buffer(settings.clone().into())
|
||||||
.expect("invalid settings frame");
|
.expect("invalid settings frame");
|
||||||
log::trace!("local settings sent; waiting for ack: {:?}", settings);
|
tracing::trace!("local settings sent; waiting for ack: {:?}", settings);
|
||||||
|
|
||||||
self.local = Local::WaitingAck(settings.clone());
|
self.local = Local::WaitingAck(settings.clone());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -133,7 +133,7 @@ impl Counts {
|
|||||||
|
|
||||||
// TODO: move this to macro?
|
// TODO: move this to macro?
|
||||||
pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
|
pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"transition_after; stream={:?}; state={:?}; is_closed={:?}; \
|
"transition_after; stream={:?}; state={:?}; is_closed={:?}; \
|
||||||
pending_send_empty={:?}; buffered_send_data={}; \
|
pending_send_empty={:?}; buffered_send_data={}; \
|
||||||
num_recv={}; num_send={}",
|
num_recv={}; num_send={}",
|
||||||
@@ -155,7 +155,7 @@ impl Counts {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if stream.is_counted {
|
if stream.is_counted {
|
||||||
log::trace!("dec_num_streams; stream={:?}", stream.id);
|
tracing::trace!("dec_num_streams; stream={:?}", stream.id);
|
||||||
// Decrement the number of active streams.
|
// Decrement the number of active streams.
|
||||||
self.dec_num_streams(&mut stream);
|
self.dec_num_streams(&mut stream);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -120,7 +120,7 @@ impl FlowControl {
|
|||||||
return Err(Reason::FLOW_CONTROL_ERROR);
|
return Err(Reason::FLOW_CONTROL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"inc_window; sz={}; old={}; new={}",
|
"inc_window; sz={}; old={}; new={}",
|
||||||
sz,
|
sz,
|
||||||
self.window_size,
|
self.window_size,
|
||||||
@@ -136,7 +136,7 @@ impl FlowControl {
|
|||||||
/// This is called after receiving a SETTINGS frame with a lower
|
/// This is called after receiving a SETTINGS frame with a lower
|
||||||
/// INITIAL_WINDOW_SIZE value.
|
/// INITIAL_WINDOW_SIZE value.
|
||||||
pub fn dec_send_window(&mut self, sz: WindowSize) {
|
pub fn dec_send_window(&mut self, sz: WindowSize) {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"dec_window; sz={}; window={}, available={}",
|
"dec_window; sz={}; window={}, available={}",
|
||||||
sz,
|
sz,
|
||||||
self.window_size,
|
self.window_size,
|
||||||
@@ -151,7 +151,7 @@ impl FlowControl {
|
|||||||
/// This is called after receiving a SETTINGS ACK frame with a lower
|
/// This is called after receiving a SETTINGS ACK frame with a lower
|
||||||
/// INITIAL_WINDOW_SIZE value.
|
/// INITIAL_WINDOW_SIZE value.
|
||||||
pub fn dec_recv_window(&mut self, sz: WindowSize) {
|
pub fn dec_recv_window(&mut self, sz: WindowSize) {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"dec_recv_window; sz={}; window={}, available={}",
|
"dec_recv_window; sz={}; window={}, available={}",
|
||||||
sz,
|
sz,
|
||||||
self.window_size,
|
self.window_size,
|
||||||
@@ -165,7 +165,7 @@ impl FlowControl {
|
|||||||
/// Decrements the window reflecting data has actually been sent. The caller
|
/// Decrements the window reflecting data has actually been sent. The caller
|
||||||
/// must ensure that the window has capacity.
|
/// must ensure that the window has capacity.
|
||||||
pub fn send_data(&mut self, sz: WindowSize) {
|
pub fn send_data(&mut self, sz: WindowSize) {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"send_data; sz={}; window={}; available={}",
|
"send_data; sz={}; window={}; available={}",
|
||||||
sz,
|
sz,
|
||||||
self.window_size,
|
self.window_size,
|
||||||
|
|||||||
@@ -84,7 +84,7 @@ impl Prioritize {
|
|||||||
|
|
||||||
flow.assign_capacity(config.remote_init_window_sz);
|
flow.assign_capacity(config.remote_init_window_sz);
|
||||||
|
|
||||||
log::trace!("Prioritize::new; flow={:?}", flow);
|
tracing::trace!("Prioritize::new; flow={:?}", flow);
|
||||||
|
|
||||||
Prioritize {
|
Prioritize {
|
||||||
pending_send: store::Queue::new(),
|
pending_send: store::Queue::new(),
|
||||||
@@ -112,7 +112,7 @@ impl Prioritize {
|
|||||||
pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
|
pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
|
||||||
// If the stream is waiting to be opened, nothing more to do.
|
// If the stream is waiting to be opened, nothing more to do.
|
||||||
if stream.is_send_ready() {
|
if stream.is_send_ready() {
|
||||||
log::trace!("schedule_send; {:?}", stream.id);
|
tracing::trace!("schedule_send; {:?}", stream.id);
|
||||||
// Queue the stream
|
// Queue the stream
|
||||||
self.pending_send.push(stream);
|
self.pending_send.push(stream);
|
||||||
|
|
||||||
@@ -158,7 +158,7 @@ impl Prioritize {
|
|||||||
// Update the buffered data counter
|
// Update the buffered data counter
|
||||||
stream.buffered_send_data += sz;
|
stream.buffered_send_data += sz;
|
||||||
|
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"send_data; sz={}; buffered={}; requested={}",
|
"send_data; sz={}; buffered={}; requested={}",
|
||||||
sz,
|
sz,
|
||||||
stream.buffered_send_data,
|
stream.buffered_send_data,
|
||||||
@@ -179,7 +179,7 @@ impl Prioritize {
|
|||||||
self.reserve_capacity(0, stream, counts);
|
self.reserve_capacity(0, stream, counts);
|
||||||
}
|
}
|
||||||
|
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"send_data (2); available={}; buffered={}",
|
"send_data (2); available={}; buffered={}",
|
||||||
stream.send_flow.available(),
|
stream.send_flow.available(),
|
||||||
stream.buffered_send_data
|
stream.buffered_send_data
|
||||||
@@ -214,7 +214,7 @@ impl Prioritize {
|
|||||||
stream: &mut store::Ptr,
|
stream: &mut store::Ptr,
|
||||||
counts: &mut Counts,
|
counts: &mut Counts,
|
||||||
) {
|
) {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
|
"reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
|
||||||
stream.id,
|
stream.id,
|
||||||
capacity,
|
capacity,
|
||||||
@@ -266,7 +266,7 @@ impl Prioritize {
|
|||||||
inc: WindowSize,
|
inc: WindowSize,
|
||||||
stream: &mut store::Ptr,
|
stream: &mut store::Ptr,
|
||||||
) -> Result<(), Reason> {
|
) -> Result<(), Reason> {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",
|
"recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",
|
||||||
stream.id,
|
stream.id,
|
||||||
stream.state,
|
stream.state,
|
||||||
@@ -326,7 +326,7 @@ impl Prioritize {
|
|||||||
pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
|
pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
|
||||||
while let Some(stream) = self.pending_capacity.pop(store) {
|
while let Some(stream) = self.pending_capacity.pop(store) {
|
||||||
counts.transition(stream, |_, stream| {
|
counts.transition(stream, |_, stream| {
|
||||||
log::trace!("clear_pending_capacity; stream={:?}", stream.id);
|
tracing::trace!("clear_pending_capacity; stream={:?}", stream.id);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -339,7 +339,7 @@ impl Prioritize {
|
|||||||
) where
|
) where
|
||||||
R: Resolve,
|
R: Resolve,
|
||||||
{
|
{
|
||||||
log::trace!("assign_connection_capacity; inc={}", inc);
|
tracing::trace!("assign_connection_capacity; inc={}", inc);
|
||||||
|
|
||||||
self.flow.assign_capacity(inc);
|
self.flow.assign_capacity(inc);
|
||||||
|
|
||||||
@@ -383,7 +383,7 @@ impl Prioritize {
|
|||||||
stream.send_flow.window_size() - stream.send_flow.available().as_size(),
|
stream.send_flow.window_size() - stream.send_flow.available().as_size(),
|
||||||
);
|
);
|
||||||
|
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"try_assign_capacity; stream={:?}, requested={}; additional={}; buffered={}; window={}; conn={}",
|
"try_assign_capacity; stream={:?}, requested={}; additional={}; buffered={}; window={}; conn={}",
|
||||||
stream.id,
|
stream.id,
|
||||||
total_requested,
|
total_requested,
|
||||||
@@ -416,7 +416,7 @@ impl Prioritize {
|
|||||||
// TODO: Should prioritization factor into this?
|
// TODO: Should prioritization factor into this?
|
||||||
let assign = cmp::min(conn_available, additional);
|
let assign = cmp::min(conn_available, additional);
|
||||||
|
|
||||||
log::trace!(" assigning; stream={:?}, capacity={}", stream.id, assign,);
|
tracing::trace!(" assigning; stream={:?}, capacity={}", stream.id, assign,);
|
||||||
|
|
||||||
// Assign the capacity to the stream
|
// Assign the capacity to the stream
|
||||||
stream.assign_capacity(assign);
|
stream.assign_capacity(assign);
|
||||||
@@ -425,7 +425,7 @@ impl Prioritize {
|
|||||||
self.flow.claim_capacity(assign);
|
self.flow.claim_capacity(assign);
|
||||||
}
|
}
|
||||||
|
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"try_assign_capacity(2); available={}; requested={}; buffered={}; has_unavailable={:?}",
|
"try_assign_capacity(2); available={}; requested={}; buffered={}; has_unavailable={:?}",
|
||||||
stream.send_flow.available(),
|
stream.send_flow.available(),
|
||||||
stream.requested_send_capacity,
|
stream.requested_send_capacity,
|
||||||
@@ -485,14 +485,14 @@ impl Prioritize {
|
|||||||
// The max frame length
|
// The max frame length
|
||||||
let max_frame_len = dst.max_send_frame_size();
|
let max_frame_len = dst.max_send_frame_size();
|
||||||
|
|
||||||
log::trace!("poll_complete");
|
tracing::trace!("poll_complete");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
self.schedule_pending_open(store, counts);
|
self.schedule_pending_open(store, counts);
|
||||||
|
|
||||||
match self.pop_frame(buffer, store, max_frame_len, counts) {
|
match self.pop_frame(buffer, store, max_frame_len, counts) {
|
||||||
Some(frame) => {
|
Some(frame) => {
|
||||||
log::trace!("writing frame={:?}", frame);
|
tracing::trace!("writing frame={:?}", frame);
|
||||||
|
|
||||||
debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
|
debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
|
||||||
if let Frame::Data(ref frame) = frame {
|
if let Frame::Data(ref frame) = frame {
|
||||||
@@ -538,11 +538,11 @@ impl Prioritize {
|
|||||||
where
|
where
|
||||||
B: Buf,
|
B: Buf,
|
||||||
{
|
{
|
||||||
log::trace!("try reclaim frame");
|
tracing::trace!("try reclaim frame");
|
||||||
|
|
||||||
// First check if there are any data chunks to take back
|
// First check if there are any data chunks to take back
|
||||||
if let Some(frame) = dst.take_last_data_frame() {
|
if let Some(frame) = dst.take_last_data_frame() {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
" -> reclaimed; frame={:?}; sz={}",
|
" -> reclaimed; frame={:?}; sz={}",
|
||||||
frame,
|
frame,
|
||||||
frame.payload().inner.get_ref().remaining()
|
frame.payload().inner.get_ref().remaining()
|
||||||
@@ -554,7 +554,7 @@ impl Prioritize {
|
|||||||
match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
|
match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
|
||||||
InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
|
InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
|
||||||
InFlightData::Drop => {
|
InFlightData::Drop => {
|
||||||
log::trace!("not reclaiming frame for cancelled stream");
|
tracing::trace!("not reclaiming frame for cancelled stream");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
InFlightData::DataFrame(k) => {
|
InFlightData::DataFrame(k) => {
|
||||||
@@ -603,11 +603,11 @@ impl Prioritize {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
|
pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
|
||||||
log::trace!("clear_queue; stream={:?}", stream.id);
|
tracing::trace!("clear_queue; stream={:?}", stream.id);
|
||||||
|
|
||||||
// TODO: make this more efficient?
|
// TODO: make this more efficient?
|
||||||
while let Some(frame) = stream.pending_send.pop_front(buffer) {
|
while let Some(frame) = stream.pending_send.pop_front(buffer) {
|
||||||
log::trace!("dropping; frame={:?}", frame);
|
tracing::trace!("dropping; frame={:?}", frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
stream.buffered_send_data = 0;
|
stream.buffered_send_data = 0;
|
||||||
@@ -644,12 +644,12 @@ impl Prioritize {
|
|||||||
where
|
where
|
||||||
B: Buf,
|
B: Buf,
|
||||||
{
|
{
|
||||||
log::trace!("pop_frame");
|
tracing::trace!("pop_frame");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match self.pending_send.pop(store) {
|
match self.pending_send.pop(store) {
|
||||||
Some(mut stream) => {
|
Some(mut stream) => {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"pop_frame; stream={:?}; stream.state={:?}",
|
"pop_frame; stream={:?}; stream.state={:?}",
|
||||||
stream.id,
|
stream.id,
|
||||||
stream.state
|
stream.state
|
||||||
@@ -662,7 +662,7 @@ impl Prioritize {
|
|||||||
// To be safe, we just always ask the stream.
|
// To be safe, we just always ask the stream.
|
||||||
let is_pending_reset = stream.is_pending_reset_expiration();
|
let is_pending_reset = stream.is_pending_reset_expiration();
|
||||||
|
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
" --> stream={:?}; is_pending_reset={:?};",
|
" --> stream={:?}; is_pending_reset={:?};",
|
||||||
stream.id,
|
stream.id,
|
||||||
is_pending_reset
|
is_pending_reset
|
||||||
@@ -675,7 +675,7 @@ impl Prioritize {
|
|||||||
let stream_capacity = stream.send_flow.available();
|
let stream_capacity = stream.send_flow.available();
|
||||||
let sz = frame.payload().remaining();
|
let sz = frame.payload().remaining();
|
||||||
|
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
" --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \
|
" --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \
|
||||||
available={}; requested={}; buffered={};",
|
available={}; requested={}; buffered={};",
|
||||||
frame.stream_id(),
|
frame.stream_id(),
|
||||||
@@ -690,7 +690,7 @@ impl Prioritize {
|
|||||||
// Zero length data frames always have capacity to
|
// Zero length data frames always have capacity to
|
||||||
// be sent.
|
// be sent.
|
||||||
if sz > 0 && stream_capacity == 0 {
|
if sz > 0 && stream_capacity == 0 {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
" --> stream capacity is 0; requested={}",
|
" --> stream capacity is 0; requested={}",
|
||||||
stream.requested_send_capacity
|
stream.requested_send_capacity
|
||||||
);
|
);
|
||||||
@@ -721,10 +721,10 @@ impl Prioritize {
|
|||||||
// capacity at this point.
|
// capacity at this point.
|
||||||
debug_assert!(len <= self.flow.window_size());
|
debug_assert!(len <= self.flow.window_size());
|
||||||
|
|
||||||
log::trace!(" --> sending data frame; len={}", len);
|
tracing::trace!(" --> sending data frame; len={}", len);
|
||||||
|
|
||||||
// Update the flow control
|
// Update the flow control
|
||||||
log::trace!(" -- updating stream flow --");
|
tracing::trace!(" -- updating stream flow --");
|
||||||
stream.send_flow.send_data(len);
|
stream.send_flow.send_data(len);
|
||||||
|
|
||||||
// Decrement the stream's buffered data counter
|
// Decrement the stream's buffered data counter
|
||||||
@@ -737,7 +737,7 @@ impl Prioritize {
|
|||||||
// line.
|
// line.
|
||||||
self.flow.assign_capacity(len);
|
self.flow.assign_capacity(len);
|
||||||
|
|
||||||
log::trace!(" -- updating connection flow --");
|
tracing::trace!(" -- updating connection flow --");
|
||||||
self.flow.send_data(len);
|
self.flow.send_data(len);
|
||||||
|
|
||||||
// Wrap the frame's data payload to ensure that the
|
// Wrap the frame's data payload to ensure that the
|
||||||
@@ -789,7 +789,7 @@ impl Prioritize {
|
|||||||
// had data buffered to be sent, but all the frames are cleared
|
// had data buffered to be sent, but all the frames are cleared
|
||||||
// in clear_queue(). Instead of doing O(N) traversal through queue
|
// in clear_queue(). Instead of doing O(N) traversal through queue
|
||||||
// to remove, lets just ignore the stream here.
|
// to remove, lets just ignore the stream here.
|
||||||
log::trace!("removing dangling stream from pending_send");
|
tracing::trace!("removing dangling stream from pending_send");
|
||||||
// Since this should only happen as a consequence of `clear_queue`,
|
// Since this should only happen as a consequence of `clear_queue`,
|
||||||
// we must be in a closed state of some kind.
|
// we must be in a closed state of some kind.
|
||||||
debug_assert!(stream.state.is_closed());
|
debug_assert!(stream.state.is_closed());
|
||||||
@@ -799,7 +799,7 @@ impl Prioritize {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
log::trace!("pop_frame; frame={:?}", frame);
|
tracing::trace!("pop_frame; frame={:?}", frame);
|
||||||
|
|
||||||
if cfg!(debug_assertions) && stream.state.is_idle() {
|
if cfg!(debug_assertions) && stream.state.is_idle() {
|
||||||
debug_assert!(stream.id > self.last_opened_id);
|
debug_assert!(stream.id > self.last_opened_id);
|
||||||
@@ -824,11 +824,11 @@ impl Prioritize {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
|
fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
|
||||||
log::trace!("schedule_pending_open");
|
tracing::trace!("schedule_pending_open");
|
||||||
// check for any pending open streams
|
// check for any pending open streams
|
||||||
while counts.can_inc_num_send_streams() {
|
while counts.can_inc_num_send_streams() {
|
||||||
if let Some(mut stream) = self.pending_open.pop(store) {
|
if let Some(mut stream) = self.pending_open.pop(store) {
|
||||||
log::trace!("schedule_pending_open; stream={:?}", stream.id);
|
tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
|
||||||
|
|
||||||
counts.inc_num_send_streams(&mut stream);
|
counts.inc_num_send_streams(&mut stream);
|
||||||
self.pending_send.push(&mut stream);
|
self.pending_send.push(&mut stream);
|
||||||
|
|||||||
@@ -160,7 +160,7 @@ impl Recv {
|
|||||||
stream: &mut store::Ptr,
|
stream: &mut store::Ptr,
|
||||||
counts: &mut Counts,
|
counts: &mut Counts,
|
||||||
) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
|
) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
|
||||||
log::trace!("opening stream; init_window={}", self.init_window_sz);
|
tracing::trace!("opening stream; init_window={}", self.init_window_sz);
|
||||||
let is_initial = stream.state.recv_open(frame.is_end_stream())?;
|
let is_initial = stream.state.recv_open(frame.is_end_stream())?;
|
||||||
|
|
||||||
if is_initial {
|
if is_initial {
|
||||||
@@ -206,7 +206,7 @@ impl Recv {
|
|||||||
// So, if peer is a server, we'll send a 431. In either case,
|
// So, if peer is a server, we'll send a 431. In either case,
|
||||||
// an error is recorded, which will send a REFUSED_STREAM,
|
// an error is recorded, which will send a REFUSED_STREAM,
|
||||||
// since we don't want any of the data frames either.
|
// since we don't want any of the data frames either.
|
||||||
log::debug!(
|
tracing::debug!(
|
||||||
"stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
|
"stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
|
||||||
recv_headers: frame is over size; stream={:?}",
|
recv_headers: frame is over size; stream={:?}",
|
||||||
stream.id
|
stream.id
|
||||||
@@ -341,7 +341,7 @@ impl Recv {
|
|||||||
|
|
||||||
/// Releases capacity of the connection
|
/// Releases capacity of the connection
|
||||||
pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
|
pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"release_connection_capacity; size={}, connection in_flight_data={}",
|
"release_connection_capacity; size={}, connection in_flight_data={}",
|
||||||
capacity,
|
capacity,
|
||||||
self.in_flight_data,
|
self.in_flight_data,
|
||||||
@@ -367,7 +367,7 @@ impl Recv {
|
|||||||
stream: &mut store::Ptr,
|
stream: &mut store::Ptr,
|
||||||
task: &mut Option<Waker>,
|
task: &mut Option<Waker>,
|
||||||
) -> Result<(), UserError> {
|
) -> Result<(), UserError> {
|
||||||
log::trace!("release_capacity; size={}", capacity);
|
tracing::trace!("release_capacity; size={}", capacity);
|
||||||
|
|
||||||
if capacity > stream.in_flight_recv_data {
|
if capacity > stream.in_flight_recv_data {
|
||||||
return Err(UserError::ReleaseCapacityTooBig);
|
return Err(UserError::ReleaseCapacityTooBig);
|
||||||
@@ -401,7 +401,7 @@ impl Recv {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"auto-release closed stream ({:?}) capacity: {:?}",
|
"auto-release closed stream ({:?}) capacity: {:?}",
|
||||||
stream.id,
|
stream.id,
|
||||||
stream.in_flight_recv_data,
|
stream.in_flight_recv_data,
|
||||||
@@ -426,7 +426,7 @@ impl Recv {
|
|||||||
/// The `task` is an optional parked task for the `Connection` that might
|
/// The `task` is an optional parked task for the `Connection` that might
|
||||||
/// be blocked on needing more window capacity.
|
/// be blocked on needing more window capacity.
|
||||||
pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) {
|
pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"set_target_connection_window; target={}; available={}, reserved={}",
|
"set_target_connection_window; target={}; available={}, reserved={}",
|
||||||
target,
|
target,
|
||||||
self.flow.available(),
|
self.flow.available(),
|
||||||
@@ -469,7 +469,7 @@ impl Recv {
|
|||||||
let old_sz = self.init_window_sz;
|
let old_sz = self.init_window_sz;
|
||||||
self.init_window_sz = target;
|
self.init_window_sz = target;
|
||||||
|
|
||||||
log::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
|
tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
|
||||||
|
|
||||||
// Per RFC 7540 §6.9.2:
|
// Per RFC 7540 §6.9.2:
|
||||||
//
|
//
|
||||||
@@ -490,7 +490,7 @@ impl Recv {
|
|||||||
if target < old_sz {
|
if target < old_sz {
|
||||||
// We must decrease the (local) window on every open stream.
|
// We must decrease the (local) window on every open stream.
|
||||||
let dec = old_sz - target;
|
let dec = old_sz - target;
|
||||||
log::trace!("decrementing all windows; dec={}", dec);
|
tracing::trace!("decrementing all windows; dec={}", dec);
|
||||||
|
|
||||||
store.for_each(|mut stream| {
|
store.for_each(|mut stream| {
|
||||||
stream.recv_flow.dec_recv_window(dec);
|
stream.recv_flow.dec_recv_window(dec);
|
||||||
@@ -499,7 +499,7 @@ impl Recv {
|
|||||||
} else if target > old_sz {
|
} else if target > old_sz {
|
||||||
// We must increase the (local) window on every open stream.
|
// We must increase the (local) window on every open stream.
|
||||||
let inc = target - old_sz;
|
let inc = target - old_sz;
|
||||||
log::trace!("incrementing all windows; inc={}", inc);
|
tracing::trace!("incrementing all windows; inc={}", inc);
|
||||||
store.for_each(|mut stream| {
|
store.for_each(|mut stream| {
|
||||||
// XXX: Shouldn't the peer have already noticed our
|
// XXX: Shouldn't the peer have already noticed our
|
||||||
// overflow and sent us a GOAWAY?
|
// overflow and sent us a GOAWAY?
|
||||||
@@ -549,7 +549,7 @@ impl Recv {
|
|||||||
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
|
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
|
||||||
}
|
}
|
||||||
|
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"recv_data; size={}; connection={}; stream={}",
|
"recv_data; size={}; connection={}; stream={}",
|
||||||
sz,
|
sz,
|
||||||
self.flow.window_size(),
|
self.flow.window_size(),
|
||||||
@@ -557,7 +557,7 @@ impl Recv {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if is_ignoring_frame {
|
if is_ignoring_frame {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"recv_data; frame ignored on locally reset {:?} for some time",
|
"recv_data; frame ignored on locally reset {:?} for some time",
|
||||||
stream.id,
|
stream.id,
|
||||||
);
|
);
|
||||||
@@ -647,7 +647,7 @@ impl Recv {
|
|||||||
|
|
||||||
pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> {
|
pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> {
|
||||||
if self.flow.window_size() < sz {
|
if self.flow.window_size() < sz {
|
||||||
log::debug!(
|
tracing::debug!(
|
||||||
"connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
|
"connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
|
||||||
self.flow.window_size(),
|
self.flow.window_size(),
|
||||||
sz,
|
sz,
|
||||||
@@ -681,7 +681,7 @@ impl Recv {
|
|||||||
// So, if peer is a server, we'll send a 431. In either case,
|
// So, if peer is a server, we'll send a 431. In either case,
|
||||||
// an error is recorded, which will send a REFUSED_STREAM,
|
// an error is recorded, which will send a REFUSED_STREAM,
|
||||||
// since we don't want any of the data frames either.
|
// since we don't want any of the data frames either.
|
||||||
log::debug!(
|
tracing::debug!(
|
||||||
"stream error REFUSED_STREAM -- recv_push_promise: \
|
"stream error REFUSED_STREAM -- recv_push_promise: \
|
||||||
headers frame is over size; promised_id={:?};",
|
headers frame is over size; promised_id={:?};",
|
||||||
frame.promised_id(),
|
frame.promised_id(),
|
||||||
@@ -730,7 +730,7 @@ impl Recv {
|
|||||||
pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
|
pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
|
||||||
if let Ok(next) = self.next_stream_id {
|
if let Ok(next) = self.next_stream_id {
|
||||||
if id >= next {
|
if id >= next {
|
||||||
log::debug!(
|
tracing::debug!(
|
||||||
"stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
|
"stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
|
||||||
id
|
id
|
||||||
);
|
);
|
||||||
@@ -821,7 +821,7 @@ impl Recv {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
log::trace!("enqueue_reset_expiration; {:?}", stream.id);
|
tracing::trace!("enqueue_reset_expiration; {:?}", stream.id);
|
||||||
|
|
||||||
if !counts.can_inc_num_reset_streams() {
|
if !counts.can_inc_num_reset_streams() {
|
||||||
// try to evict 1 stream if possible
|
// try to evict 1 stream if possible
|
||||||
@@ -891,7 +891,7 @@ impl Recv {
|
|||||||
fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
|
fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
|
||||||
while let Some(stream) = self.pending_window_updates.pop(store) {
|
while let Some(stream) = self.pending_window_updates.pop(store) {
|
||||||
counts.transition(stream, |_, stream| {
|
counts.transition(stream, |_, stream| {
|
||||||
log::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
|
tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -981,7 +981,7 @@ impl Recv {
|
|||||||
};
|
};
|
||||||
|
|
||||||
counts.transition(stream, |_, stream| {
|
counts.transition(stream, |_, stream| {
|
||||||
log::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
|
tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
|
||||||
debug_assert!(!stream.is_pending_window_update);
|
debug_assert!(!stream.is_pending_window_update);
|
||||||
|
|
||||||
if !stream.state.is_recv_streaming() {
|
if !stream.state.is_recv_streaming() {
|
||||||
|
|||||||
@@ -77,11 +77,11 @@ impl Send {
|
|||||||
|| fields.contains_key("keep-alive")
|
|| fields.contains_key("keep-alive")
|
||||||
|| fields.contains_key("proxy-connection")
|
|| fields.contains_key("proxy-connection")
|
||||||
{
|
{
|
||||||
log::debug!("illegal connection-specific headers found");
|
tracing::debug!("illegal connection-specific headers found");
|
||||||
return Err(UserError::MalformedHeaders);
|
return Err(UserError::MalformedHeaders);
|
||||||
} else if let Some(te) = fields.get(http::header::TE) {
|
} else if let Some(te) = fields.get(http::header::TE) {
|
||||||
if te != "trailers" {
|
if te != "trailers" {
|
||||||
log::debug!("illegal connection-specific headers found");
|
tracing::debug!("illegal connection-specific headers found");
|
||||||
return Err(UserError::MalformedHeaders);
|
return Err(UserError::MalformedHeaders);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -95,7 +95,7 @@ impl Send {
|
|||||||
stream: &mut store::Ptr,
|
stream: &mut store::Ptr,
|
||||||
task: &mut Option<Waker>,
|
task: &mut Option<Waker>,
|
||||||
) -> Result<(), UserError> {
|
) -> Result<(), UserError> {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"send_push_promise; frame={:?}; init_window={:?}",
|
"send_push_promise; frame={:?}; init_window={:?}",
|
||||||
frame,
|
frame,
|
||||||
self.init_window_sz
|
self.init_window_sz
|
||||||
@@ -118,7 +118,7 @@ impl Send {
|
|||||||
counts: &mut Counts,
|
counts: &mut Counts,
|
||||||
task: &mut Option<Waker>,
|
task: &mut Option<Waker>,
|
||||||
) -> Result<(), UserError> {
|
) -> Result<(), UserError> {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"send_headers; frame={:?}; init_window={:?}",
|
"send_headers; frame={:?}; init_window={:?}",
|
||||||
frame,
|
frame,
|
||||||
self.init_window_sz
|
self.init_window_sz
|
||||||
@@ -167,7 +167,7 @@ impl Send {
|
|||||||
let is_closed = stream.state.is_closed();
|
let is_closed = stream.state.is_closed();
|
||||||
let is_empty = stream.pending_send.is_empty();
|
let is_empty = stream.pending_send.is_empty();
|
||||||
|
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"send_reset(..., reason={:?}, stream={:?}, ..., \
|
"send_reset(..., reason={:?}, stream={:?}, ..., \
|
||||||
is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
|
is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
|
||||||
state={:?} \
|
state={:?} \
|
||||||
@@ -182,7 +182,7 @@ impl Send {
|
|||||||
|
|
||||||
if is_reset {
|
if is_reset {
|
||||||
// Don't double reset
|
// Don't double reset
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
" -> not sending RST_STREAM ({:?} is already reset)",
|
" -> not sending RST_STREAM ({:?} is already reset)",
|
||||||
stream.id
|
stream.id
|
||||||
);
|
);
|
||||||
@@ -195,7 +195,7 @@ impl Send {
|
|||||||
// If closed AND the send queue is flushed, then the stream cannot be
|
// If closed AND the send queue is flushed, then the stream cannot be
|
||||||
// reset explicitly, either. Implicit resets can still be queued.
|
// reset explicitly, either. Implicit resets can still be queued.
|
||||||
if is_closed && is_empty {
|
if is_closed && is_empty {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
" -> not sending explicit RST_STREAM ({:?} was closed \
|
" -> not sending explicit RST_STREAM ({:?} was closed \
|
||||||
and send queue was flushed)",
|
and send queue was flushed)",
|
||||||
stream.id
|
stream.id
|
||||||
@@ -211,7 +211,7 @@ impl Send {
|
|||||||
|
|
||||||
let frame = frame::Reset::new(stream.id, reason);
|
let frame = frame::Reset::new(stream.id, reason);
|
||||||
|
|
||||||
log::trace!("send_reset -- queueing; frame={:?}", frame);
|
tracing::trace!("send_reset -- queueing; frame={:?}", frame);
|
||||||
self.prioritize
|
self.prioritize
|
||||||
.queue_frame(frame.into(), buffer, stream, task);
|
.queue_frame(frame.into(), buffer, stream, task);
|
||||||
self.prioritize.reclaim_all_capacity(stream, counts);
|
self.prioritize.reclaim_all_capacity(stream, counts);
|
||||||
@@ -269,7 +269,7 @@ impl Send {
|
|||||||
|
|
||||||
stream.state.send_close();
|
stream.state.send_close();
|
||||||
|
|
||||||
log::trace!("send_trailers -- queuing; frame={:?}", frame);
|
tracing::trace!("send_trailers -- queuing; frame={:?}", frame);
|
||||||
self.prioritize
|
self.prioritize
|
||||||
.queue_frame(frame.into(), buffer, stream, task);
|
.queue_frame(frame.into(), buffer, stream, task);
|
||||||
|
|
||||||
@@ -370,7 +370,7 @@ impl Send {
|
|||||||
task: &mut Option<Waker>,
|
task: &mut Option<Waker>,
|
||||||
) -> Result<(), Reason> {
|
) -> Result<(), Reason> {
|
||||||
if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
|
if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
|
||||||
log::debug!("recv_stream_window_update !!; err={:?}", e);
|
tracing::debug!("recv_stream_window_update !!; err={:?}", e);
|
||||||
|
|
||||||
self.send_reset(Reason::FLOW_CONTROL_ERROR, buffer, stream, counts, task);
|
self.send_reset(Reason::FLOW_CONTROL_ERROR, buffer, stream, counts, task);
|
||||||
|
|
||||||
@@ -443,7 +443,7 @@ impl Send {
|
|||||||
if val < old_val {
|
if val < old_val {
|
||||||
// We must decrease the (remote) window on every open stream.
|
// We must decrease the (remote) window on every open stream.
|
||||||
let dec = old_val - val;
|
let dec = old_val - val;
|
||||||
log::trace!("decrementing all windows; dec={}", dec);
|
tracing::trace!("decrementing all windows; dec={}", dec);
|
||||||
|
|
||||||
let mut total_reclaimed = 0;
|
let mut total_reclaimed = 0;
|
||||||
store.for_each(|mut stream| {
|
store.for_each(|mut stream| {
|
||||||
@@ -469,7 +469,7 @@ impl Send {
|
|||||||
0
|
0
|
||||||
};
|
};
|
||||||
|
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
|
"decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
|
||||||
stream.id,
|
stream.id,
|
||||||
dec,
|
dec,
|
||||||
|
|||||||
@@ -216,12 +216,12 @@ impl State {
|
|||||||
match self.inner {
|
match self.inner {
|
||||||
Open { local, .. } => {
|
Open { local, .. } => {
|
||||||
// The remote side will continue to receive data.
|
// The remote side will continue to receive data.
|
||||||
log::trace!("recv_close: Open => HalfClosedRemote({:?})", local);
|
tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local);
|
||||||
self.inner = HalfClosedRemote(local);
|
self.inner = HalfClosedRemote(local);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
HalfClosedLocal(..) => {
|
HalfClosedLocal(..) => {
|
||||||
log::trace!("recv_close: HalfClosedLocal => Closed");
|
tracing::trace!("recv_close: HalfClosedLocal => Closed");
|
||||||
self.inner = Closed(Cause::EndStream);
|
self.inner = Closed(Cause::EndStream);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -257,7 +257,7 @@ impl State {
|
|||||||
// previous state with the received RST_STREAM, so that the queue
|
// previous state with the received RST_STREAM, so that the queue
|
||||||
// will be cleared by `Prioritize::pop_frame`.
|
// will be cleared by `Prioritize::pop_frame`.
|
||||||
state => {
|
state => {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"recv_reset; reason={:?}; state={:?}; queued={:?}",
|
"recv_reset; reason={:?}; state={:?}; queued={:?}",
|
||||||
reason,
|
reason,
|
||||||
state,
|
state,
|
||||||
@@ -275,7 +275,7 @@ impl State {
|
|||||||
match self.inner {
|
match self.inner {
|
||||||
Closed(..) => {}
|
Closed(..) => {}
|
||||||
_ => {
|
_ => {
|
||||||
log::trace!("recv_err; err={:?}", err);
|
tracing::trace!("recv_err; err={:?}", err);
|
||||||
self.inner = Closed(match *err {
|
self.inner = Closed(match *err {
|
||||||
Proto(reason) => Cause::LocallyReset(reason),
|
Proto(reason) => Cause::LocallyReset(reason),
|
||||||
Io(..) => Cause::Io,
|
Io(..) => Cause::Io,
|
||||||
@@ -288,7 +288,7 @@ impl State {
|
|||||||
match self.inner {
|
match self.inner {
|
||||||
Closed(..) => {}
|
Closed(..) => {}
|
||||||
s => {
|
s => {
|
||||||
log::trace!("recv_eof; state={:?}", s);
|
tracing::trace!("recv_eof; state={:?}", s);
|
||||||
self.inner = Closed(Cause::Io);
|
self.inner = Closed(Cause::Io);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -299,11 +299,11 @@ impl State {
|
|||||||
match self.inner {
|
match self.inner {
|
||||||
Open { remote, .. } => {
|
Open { remote, .. } => {
|
||||||
// The remote side will continue to receive data.
|
// The remote side will continue to receive data.
|
||||||
log::trace!("send_close: Open => HalfClosedLocal({:?})", remote);
|
tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote);
|
||||||
self.inner = HalfClosedLocal(remote);
|
self.inner = HalfClosedLocal(remote);
|
||||||
}
|
}
|
||||||
HalfClosedRemote(..) => {
|
HalfClosedRemote(..) => {
|
||||||
log::trace!("send_close: HalfClosedRemote => Closed");
|
tracing::trace!("send_close: HalfClosedRemote => Closed");
|
||||||
self.inner = Closed(Cause::EndStream);
|
self.inner = Closed(Cause::EndStream);
|
||||||
}
|
}
|
||||||
state => panic!("send_close: unexpected state {:?}", state),
|
state => panic!("send_close: unexpected state {:?}", state),
|
||||||
|
|||||||
@@ -244,10 +244,10 @@ where
|
|||||||
///
|
///
|
||||||
/// If the stream is already contained by the list, return `false`.
|
/// If the stream is already contained by the list, return `false`.
|
||||||
pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
|
pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
|
||||||
log::trace!("Queue::push");
|
tracing::trace!("Queue::push");
|
||||||
|
|
||||||
if N::is_queued(stream) {
|
if N::is_queued(stream) {
|
||||||
log::trace!(" -> already queued");
|
tracing::trace!(" -> already queued");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -259,7 +259,7 @@ where
|
|||||||
// Queue the stream
|
// Queue the stream
|
||||||
match self.indices {
|
match self.indices {
|
||||||
Some(ref mut idxs) => {
|
Some(ref mut idxs) => {
|
||||||
log::trace!(" -> existing entries");
|
tracing::trace!(" -> existing entries");
|
||||||
|
|
||||||
// Update the current tail node to point to `stream`
|
// Update the current tail node to point to `stream`
|
||||||
let key = stream.key();
|
let key = stream.key();
|
||||||
@@ -269,7 +269,7 @@ where
|
|||||||
idxs.tail = stream.key();
|
idxs.tail = stream.key();
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
log::trace!(" -> first entry");
|
tracing::trace!(" -> first entry");
|
||||||
self.indices = Some(store::Indices {
|
self.indices = Some(store::Indices {
|
||||||
head: stream.key(),
|
head: stream.key(),
|
||||||
tail: stream.key(),
|
tail: stream.key(),
|
||||||
|
|||||||
@@ -265,7 +265,7 @@ impl Stream {
|
|||||||
self.send_capacity_inc = true;
|
self.send_capacity_inc = true;
|
||||||
self.send_flow.assign_capacity(capacity);
|
self.send_flow.assign_capacity(capacity);
|
||||||
|
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
" assigned capacity to stream; available={}; buffered={}; id={:?}",
|
" assigned capacity to stream; available={}; buffered={}; id={:?}",
|
||||||
self.send_flow.available(),
|
self.send_flow.available(),
|
||||||
self.buffered_send_data,
|
self.buffered_send_data,
|
||||||
@@ -274,7 +274,7 @@ impl Stream {
|
|||||||
|
|
||||||
// Only notify if the capacity exceeds the amount of buffered data
|
// Only notify if the capacity exceeds the amount of buffered data
|
||||||
if self.send_flow.available() > self.buffered_send_data {
|
if self.send_flow.available() > self.buffered_send_data {
|
||||||
log::trace!(" notifying task");
|
tracing::trace!(" notifying task");
|
||||||
self.notify_send();
|
self.notify_send();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -135,7 +135,7 @@ where
|
|||||||
// The GOAWAY process has begun. All streams with a greater ID than
|
// The GOAWAY process has begun. All streams with a greater ID than
|
||||||
// specified as part of GOAWAY should be ignored.
|
// specified as part of GOAWAY should be ignored.
|
||||||
if id > me.actions.recv.max_stream_id() {
|
if id > me.actions.recv.max_stream_id() {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"id ({:?}) > max_stream_id ({:?}), ignoring HEADERS",
|
"id ({:?}) > max_stream_id ({:?}), ignoring HEADERS",
|
||||||
id,
|
id,
|
||||||
me.actions.recv.max_stream_id()
|
me.actions.recv.max_stream_id()
|
||||||
@@ -155,7 +155,7 @@ where
|
|||||||
// This may be response headers for a stream we've already
|
// This may be response headers for a stream we've already
|
||||||
// forgotten about...
|
// forgotten about...
|
||||||
if me.actions.may_have_forgotten_stream::<P>(id) {
|
if me.actions.may_have_forgotten_stream::<P>(id) {
|
||||||
log::debug!(
|
tracing::debug!(
|
||||||
"recv_headers for old stream={:?}, sending STREAM_CLOSED",
|
"recv_headers for old stream={:?}, sending STREAM_CLOSED",
|
||||||
id,
|
id,
|
||||||
);
|
);
|
||||||
@@ -187,7 +187,7 @@ where
|
|||||||
// Locally reset streams must ignore frames "for some time".
|
// Locally reset streams must ignore frames "for some time".
|
||||||
// This is because the remote may have sent trailers before
|
// This is because the remote may have sent trailers before
|
||||||
// receiving the RST_STREAM frame.
|
// receiving the RST_STREAM frame.
|
||||||
log::trace!("recv_headers; ignoring trailers on {:?}", stream.id);
|
tracing::trace!("recv_headers; ignoring trailers on {:?}", stream.id);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -196,7 +196,7 @@ where
|
|||||||
let send_buffer = &mut *send_buffer;
|
let send_buffer = &mut *send_buffer;
|
||||||
|
|
||||||
me.counts.transition(stream, |counts, stream| {
|
me.counts.transition(stream, |counts, stream| {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"recv_headers; stream={:?}; state={:?}",
|
"recv_headers; stream={:?}; state={:?}",
|
||||||
stream.id,
|
stream.id,
|
||||||
stream.state
|
stream.state
|
||||||
@@ -259,7 +259,7 @@ where
|
|||||||
// The GOAWAY process has begun. All streams with a greater ID
|
// The GOAWAY process has begun. All streams with a greater ID
|
||||||
// than specified as part of GOAWAY should be ignored.
|
// than specified as part of GOAWAY should be ignored.
|
||||||
if id > me.actions.recv.max_stream_id() {
|
if id > me.actions.recv.max_stream_id() {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"id ({:?}) > max_stream_id ({:?}), ignoring DATA",
|
"id ({:?}) > max_stream_id ({:?}), ignoring DATA",
|
||||||
id,
|
id,
|
||||||
me.actions.recv.max_stream_id()
|
me.actions.recv.max_stream_id()
|
||||||
@@ -268,7 +268,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
if me.actions.may_have_forgotten_stream::<P>(id) {
|
if me.actions.may_have_forgotten_stream::<P>(id) {
|
||||||
log::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,);
|
tracing::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,);
|
||||||
|
|
||||||
let sz = frame.payload().len();
|
let sz = frame.payload().len();
|
||||||
// This should have been enforced at the codec::FramedRead layer, so
|
// This should have been enforced at the codec::FramedRead layer, so
|
||||||
@@ -322,7 +322,7 @@ where
|
|||||||
// The GOAWAY process has begun. All streams with a greater ID than
|
// The GOAWAY process has begun. All streams with a greater ID than
|
||||||
// specified as part of GOAWAY should be ignored.
|
// specified as part of GOAWAY should be ignored.
|
||||||
if id > me.actions.recv.max_stream_id() {
|
if id > me.actions.recv.max_stream_id() {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM",
|
"id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM",
|
||||||
id,
|
id,
|
||||||
me.actions.recv.max_stream_id()
|
me.actions.recv.max_stream_id()
|
||||||
@@ -470,7 +470,7 @@ where
|
|||||||
// The GOAWAY process has begun. All streams with a greater ID
|
// The GOAWAY process has begun. All streams with a greater ID
|
||||||
// than specified as part of GOAWAY should be ignored.
|
// than specified as part of GOAWAY should be ignored.
|
||||||
if id > me.actions.recv.max_stream_id() {
|
if id > me.actions.recv.max_stream_id() {
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE",
|
"id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE",
|
||||||
id,
|
id,
|
||||||
me.actions.recv.max_stream_id()
|
me.actions.recv.max_stream_id()
|
||||||
@@ -563,7 +563,7 @@ where
|
|||||||
me.refs += 1;
|
me.refs += 1;
|
||||||
key.map(|key| {
|
key.map(|key| {
|
||||||
let stream = &mut me.store.resolve(key);
|
let stream = &mut me.store.resolve(key);
|
||||||
log::trace!(
|
tracing::trace!(
|
||||||
"next_incoming; id={:?}, state={:?}",
|
"next_incoming; id={:?}, state={:?}",
|
||||||
stream.id,
|
stream.id,
|
||||||
stream.state
|
stream.state
|
||||||
@@ -788,7 +788,7 @@ where
|
|||||||
|
|
||||||
if let Some(pending) = pending {
|
if let Some(pending) = pending {
|
||||||
let mut stream = me.store.resolve(pending.key);
|
let mut stream = me.store.resolve(pending.key);
|
||||||
log::trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
|
tracing::trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
|
||||||
if stream.is_pending_open {
|
if stream.is_pending_open {
|
||||||
stream.wait_send(cx);
|
stream.wait_send(cx);
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
@@ -818,7 +818,7 @@ where
|
|||||||
actions.conn_error = Some(io::Error::from(io::ErrorKind::BrokenPipe).into());
|
actions.conn_error = Some(io::Error::from(io::ErrorKind::BrokenPipe).into());
|
||||||
}
|
}
|
||||||
|
|
||||||
log::trace!("Streams::recv_eof");
|
tracing::trace!("Streams::recv_eof");
|
||||||
|
|
||||||
me.store
|
me.store
|
||||||
.for_each(|stream| {
|
.for_each(|stream| {
|
||||||
@@ -1265,7 +1265,7 @@ fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
|
|||||||
Ok(inner) => inner,
|
Ok(inner) => inner,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
if ::std::thread::panicking() {
|
if ::std::thread::panicking() {
|
||||||
log::trace!("StreamRef::drop; mutex poisoned");
|
tracing::trace!("StreamRef::drop; mutex poisoned");
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
panic!("StreamRef::drop; mutex poisoned");
|
panic!("StreamRef::drop; mutex poisoned");
|
||||||
@@ -1277,7 +1277,7 @@ fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
|
|||||||
me.refs -= 1;
|
me.refs -= 1;
|
||||||
let mut stream = me.store.resolve(key);
|
let mut stream = me.store.resolve(key);
|
||||||
|
|
||||||
log::trace!("drop_stream_ref; stream={:?}", stream);
|
tracing::trace!("drop_stream_ref; stream={:?}", stream);
|
||||||
|
|
||||||
// decrement the stream's ref count by 1.
|
// decrement the stream's ref count by 1.
|
||||||
stream.ref_dec();
|
stream.ref_dec();
|
||||||
|
|||||||
@@ -402,7 +402,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some(inner) = self.connection.next_incoming() {
|
if let Some(inner) = self.connection.next_incoming() {
|
||||||
log::trace!("received incoming");
|
tracing::trace!("received incoming");
|
||||||
let (head, _) = inner.take_request().into_parts();
|
let (head, _) = inner.take_request().into_parts();
|
||||||
let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
|
let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
|
||||||
|
|
||||||
@@ -1179,7 +1179,7 @@ where
|
|||||||
type Output = Result<Connection<T, B>, crate::Error>;
|
type Output = Result<Connection<T, B>, crate::Error>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
log::trace!("Handshake::poll(); state={:?};", self.state);
|
tracing::trace!("Handshake::poll(); state={:?};", self.state);
|
||||||
use crate::server::Handshaking::*;
|
use crate::server::Handshaking::*;
|
||||||
|
|
||||||
self.state = if let Flushing(ref mut flush) = self.state {
|
self.state = if let Flushing(ref mut flush) = self.state {
|
||||||
@@ -1188,11 +1188,11 @@ where
|
|||||||
// for the client preface.
|
// for the client preface.
|
||||||
let codec = match Pin::new(flush).poll(cx)? {
|
let codec = match Pin::new(flush).poll(cx)? {
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
log::trace!("Handshake::poll(); flush.poll()=Pending");
|
tracing::trace!("Handshake::poll(); flush.poll()=Pending");
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
}
|
}
|
||||||
Poll::Ready(flushed) => {
|
Poll::Ready(flushed) => {
|
||||||
log::trace!("Handshake::poll(); flush.poll()=Ready");
|
tracing::trace!("Handshake::poll(); flush.poll()=Ready");
|
||||||
flushed
|
flushed
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -1229,7 +1229,7 @@ where
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
log::trace!("Handshake::poll(); connection established!");
|
tracing::trace!("Handshake::poll(); connection established!");
|
||||||
let mut c = Connection { connection };
|
let mut c = Connection { connection };
|
||||||
if let Some(sz) = self.builder.initial_target_connection_window_size {
|
if let Some(sz) = self.builder.initial_target_connection_window_size {
|
||||||
c.set_target_window_size(sz);
|
c.set_target_window_size(sz);
|
||||||
@@ -1289,12 +1289,12 @@ impl Peer {
|
|||||||
if let Err(e) = frame::PushPromise::validate_request(&request) {
|
if let Err(e) = frame::PushPromise::validate_request(&request) {
|
||||||
use PushPromiseHeaderError::*;
|
use PushPromiseHeaderError::*;
|
||||||
match e {
|
match e {
|
||||||
NotSafeAndCacheable => log::debug!(
|
NotSafeAndCacheable => tracing::debug!(
|
||||||
"convert_push_message: method {} is not safe and cacheable; promised_id={:?}",
|
"convert_push_message: method {} is not safe and cacheable; promised_id={:?}",
|
||||||
request.method(),
|
request.method(),
|
||||||
promised_id,
|
promised_id,
|
||||||
),
|
),
|
||||||
InvalidContentLength(e) => log::debug!(
|
InvalidContentLength(e) => tracing::debug!(
|
||||||
"convert_push_message; promised request has invalid content-length {:?}; promised_id={:?}",
|
"convert_push_message; promised request has invalid content-length {:?}; promised_id={:?}",
|
||||||
e,
|
e,
|
||||||
promised_id,
|
promised_id,
|
||||||
@@ -1347,7 +1347,7 @@ impl proto::Peer for Peer {
|
|||||||
|
|
||||||
macro_rules! malformed {
|
macro_rules! malformed {
|
||||||
($($arg:tt)*) => {{
|
($($arg:tt)*) => {{
|
||||||
log::debug!($($arg)*);
|
tracing::debug!($($arg)*);
|
||||||
return Err(RecvError::Stream {
|
return Err(RecvError::Stream {
|
||||||
id: stream_id,
|
id: stream_id,
|
||||||
reason: Reason::PROTOCOL_ERROR,
|
reason: Reason::PROTOCOL_ERROR,
|
||||||
@@ -1367,7 +1367,7 @@ impl proto::Peer for Peer {
|
|||||||
|
|
||||||
// Specifying :status for a request is a protocol error
|
// Specifying :status for a request is a protocol error
|
||||||
if pseudo.status.is_some() {
|
if pseudo.status.is_some() {
|
||||||
log::trace!("malformed headers: :status field on request; PROTOCOL_ERROR");
|
tracing::trace!("malformed headers: :status field on request; PROTOCOL_ERROR");
|
||||||
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
|
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,6 +9,6 @@ edition = "2018"
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
h2-support = { path = "../h2-support" }
|
h2-support = { path = "../h2-support" }
|
||||||
log = "0.4.1"
|
tracing = "0.1.13"
|
||||||
futures = { version = "0.3", default-features = false, features = ["alloc"] }
|
futures = { version = "0.3", default-features = false, features = ["alloc"] }
|
||||||
tokio = { version = "0.2", features = ["macros", "tcp"] }
|
tokio = { version = "0.2", features = ["macros", "tcp"] }
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ async fn handshake() {
|
|||||||
|
|
||||||
let (_client, h2) = client::handshake(mock).await.unwrap();
|
let (_client, h2) = client::handshake(mock).await.unwrap();
|
||||||
|
|
||||||
log::trace!("hands have been shook");
|
tracing::trace!("hands have been shook");
|
||||||
|
|
||||||
// At this point, the connection should be closed
|
// At this point, the connection should be closed
|
||||||
h2.await.unwrap();
|
h2.await.unwrap();
|
||||||
@@ -84,7 +84,7 @@ async fn recv_invalid_server_stream_id() {
|
|||||||
.body(())
|
.body(())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
log::info!("sending request");
|
tracing::info!("sending request");
|
||||||
let (response, _) = client.send_request(request, true).unwrap();
|
let (response, _) = client.send_request(request, true).unwrap();
|
||||||
|
|
||||||
// The connection errors
|
// The connection errors
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ async fn send_recv_headers_only() {
|
|||||||
.body(())
|
.body(())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
log::info!("sending request");
|
tracing::info!("sending request");
|
||||||
let (response, _) = client.send_request(request, true).unwrap();
|
let (response, _) = client.send_request(request, true).unwrap();
|
||||||
|
|
||||||
let resp = h2.run(response).await.unwrap();
|
let resp = h2.run(response).await.unwrap();
|
||||||
@@ -72,7 +72,7 @@ async fn send_recv_data() {
|
|||||||
.body(())
|
.body(())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
log::info!("sending request");
|
tracing::info!("sending request");
|
||||||
let (response, mut stream) = client.send_request(request, false).unwrap();
|
let (response, mut stream) = client.send_request(request, false).unwrap();
|
||||||
|
|
||||||
// Reserve send capacity
|
// Reserve send capacity
|
||||||
@@ -129,7 +129,7 @@ async fn send_headers_recv_data_single_frame() {
|
|||||||
.body(())
|
.body(())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
log::info!("sending request");
|
tracing::info!("sending request");
|
||||||
let (response, _) = client.send_request(request, true).unwrap();
|
let (response, _) = client.send_request(request, true).unwrap();
|
||||||
|
|
||||||
let resp = h2.run(response).await.unwrap();
|
let resp = h2.run(response).await.unwrap();
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ async fn recv_trailers_only() {
|
|||||||
.body(())
|
.body(())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
log::info!("sending request");
|
tracing::info!("sending request");
|
||||||
let (response, _) = client.send_request(request, true).unwrap();
|
let (response, _) = client.send_request(request, true).unwrap();
|
||||||
|
|
||||||
let response = h2.run(response).await.unwrap();
|
let response = h2.run(response).await.unwrap();
|
||||||
@@ -79,7 +79,7 @@ async fn send_trailers_immediately() {
|
|||||||
.body(())
|
.body(())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
log::info!("sending request");
|
tracing::info!("sending request");
|
||||||
let (response, mut stream) = client.send_request(request, false).unwrap();
|
let (response, mut stream) = client.send_request(request, false).unwrap();
|
||||||
|
|
||||||
let mut trailers = HeaderMap::new();
|
let mut trailers = HeaderMap::new();
|
||||||
|
|||||||
Reference in New Issue
Block a user