From fc7f63f641210c52f8e742c1538c4aa455c795ab Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 17 Aug 2020 17:29:22 -0700 Subject: [PATCH] start adding `tracing` spans to internals (#478) We've adopted `tracing` for diagnostics, but currently, it is just being used as a drop-in replacement for the `log` crate. Ideally, we would want to start emitting more structured diagnostics, using `tracing`'s `Span`s and structured key-value fields. A lot of the logging in `h2` is already written in a style that imitates the formatting of structured key-value logs, but as textual log messages. Migrating the logs to structured `tracing` events therefore is pretty easy to do. I've also started adding spans, mostly in the read path. Finally, I've updated the tests to use `tracing` rather than `env_logger`. The tracing setup happens in a macro, so that a span for each test with the test's name can be generated and entered. This will make the test output easier to read if multiple tests are run concurrently with `--nocapture`. Signed-off-by: Eliza Weisman --- Cargo.toml | 1 + src/client.rs | 8 +- src/codec/framed_read.rs | 10 +- src/codec/framed_write.rs | 21 +-- src/hpack/decoder.rs | 25 ++-- src/hpack/encoder.rs | 4 + src/hpack/table.rs | 2 +- src/proto/connection.rs | 40 ++++-- src/proto/peer.rs | 1 + src/proto/streams/prioritize.rs | 169 ++++++++++++------------- src/server.rs | 42 ++++-- tests/h2-support/Cargo.toml | 3 +- tests/h2-support/src/lib.rs | 17 +++ tests/h2-support/src/prelude.rs | 2 +- tests/h2-support/src/trace.rs | 41 ++++++ tests/h2-tests/tests/client_request.rs | 44 +++---- tests/h2-tests/tests/codec_read.rs | 4 +- tests/h2-tests/tests/codec_write.rs | 6 +- tests/h2-tests/tests/flow_control.rs | 52 ++++---- tests/h2-tests/tests/ping_pong.rs | 10 +- tests/h2-tests/tests/prioritization.rs | 12 +- tests/h2-tests/tests/push_promise.rs | 16 +-- tests/h2-tests/tests/server.rs | 44 +++---- tests/h2-tests/tests/stream_states.rs | 42 +++--- tests/h2-tests/tests/trailers.rs | 4 +- 25 files changed, 363 insertions(+), 257 deletions(-) create mode 100644 tests/h2-support/src/trace.rs diff --git a/Cargo.toml b/Cargo.toml index b9e2488..a6e8200 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ tokio = { version = "0.2", features = ["io-util"] } bytes = "0.5.2" http = "0.2" tracing = { version = "0.1.13", default-features = false, features = ["std", "log"] } +tracing-futures = { version = "0.2", default-features = false, features = ["std-future"]} fnv = "1.0.5" slab = "0.4.0" indexmap = "1.0" diff --git a/src/client.rs b/src/client.rs index 597eb3d..1233f46 100644 --- a/src/client.rs +++ b/src/client.rs @@ -149,6 +149,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use std::usize; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tracing_futures::Instrument; /// Initializes new HTTP/2.0 streams on a connection by sending a request. /// @@ -1115,7 +1116,10 @@ where T: AsyncRead + AsyncWrite + Unpin, { let builder = Builder::new(); - builder.handshake(io).await + builder + .handshake(io) + .instrument(tracing::trace_span!("client_handshake", io = %std::any::type_name::())) + .await } // ===== impl Connection ===== @@ -1438,6 +1442,8 @@ impl Peer { impl proto::Peer for Peer { type Poll = Response<()>; + const NAME: &'static str = "Client"; + fn r#dyn() -> proto::DynPeer { proto::DynPeer::Client } diff --git a/src/codec/framed_read.rs b/src/codec/framed_read.rs index 2674e39..8bba125 100644 --- a/src/codec/framed_read.rs +++ b/src/codec/framed_read.rs @@ -61,6 +61,8 @@ impl FramedRead { fn decode_frame(&mut self, mut bytes: BytesMut) -> Result, RecvError> { use self::RecvError::*; + let span = tracing::trace_span!("FramedRead::decode_frame", offset = bytes.len()); + let _e = span.enter(); tracing::trace!("decoding frame from {}B", bytes.len()); @@ -74,7 +76,7 @@ impl FramedRead { let kind = head.kind(); - tracing::trace!(" -> kind={:?}", kind); + tracing::trace!(frame.kind = ?kind); macro_rules! header_block { ($frame:ident, $head:ident, $bytes:ident) => ({ @@ -338,6 +340,8 @@ where type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let span = tracing::trace_span!("FramedRead::poll_next"); + let _e = span.enter(); loop { tracing::trace!("poll"); let bytes = match ready!(Pin::new(&mut self.inner).poll_next(cx)) { @@ -346,9 +350,9 @@ where None => return Poll::Ready(None), }; - tracing::trace!("poll; bytes={}B", bytes.len()); + tracing::trace!(read.bytes = bytes.len()); if let Some(frame) = self.decode_frame(bytes)? { - tracing::debug!("received; frame={:?}", frame); + tracing::debug!(?frame, "received"); return Poll::Ready(Some(Ok(frame))); } } diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index 47ee592..201bba2 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -105,8 +105,10 @@ where pub fn buffer(&mut self, item: Frame) -> Result<(), UserError> { // Ensure that we have enough capacity to accept the write. assert!(self.has_capacity()); + let span = tracing::trace_span!("FramedWrite::buffer", frame = ?item); + let _e = span.enter(); - tracing::debug!("send; frame={:?}", item); + tracing::debug!(frame = ?item, "send"); match item { Frame::Data(mut v) => { @@ -150,19 +152,19 @@ where } Frame::Settings(v) => { v.encode(self.buf.get_mut()); - tracing::trace!("encoded settings; rem={:?}", self.buf.remaining()); + tracing::trace!(rem = self.buf.remaining(), "encoded settings"); } Frame::GoAway(v) => { v.encode(self.buf.get_mut()); - tracing::trace!("encoded go_away; rem={:?}", self.buf.remaining()); + tracing::trace!(rem = self.buf.remaining(), "encoded go_away"); } Frame::Ping(v) => { v.encode(self.buf.get_mut()); - tracing::trace!("encoded ping; rem={:?}", self.buf.remaining()); + tracing::trace!(rem = self.buf.remaining(), "encoded ping"); } Frame::WindowUpdate(v) => { v.encode(self.buf.get_mut()); - tracing::trace!("encoded window_update; rem={:?}", self.buf.remaining()); + tracing::trace!(rem = self.buf.remaining(), "encoded window_update"); } Frame::Priority(_) => { @@ -174,7 +176,7 @@ where } Frame::Reset(v) => { v.encode(self.buf.get_mut()); - tracing::trace!("encoded reset; rem={:?}", self.buf.remaining()); + tracing::trace!(rem = self.buf.remaining(), "encoded reset"); } } @@ -183,18 +185,19 @@ where /// Flush buffered data to the wire pub fn flush(&mut self, cx: &mut Context) -> Poll> { - tracing::trace!("flush"); + let span = tracing::trace_span!("FramedWrite::flush"); + let _e = span.enter(); loop { while !self.is_empty() { match self.next { Some(Next::Data(ref mut frame)) => { - tracing::trace!(" -> queued data frame"); + tracing::trace!(queued_data_frame = true); let mut buf = (&mut self.buf).chain(frame.payload_mut()); ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut buf))?; } _ => { - tracing::trace!(" -> not a queued data frame"); + tracing::trace!(queued_data_frame = false); ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut self.buf))?; } } diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index 3009e30..aba673d 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -183,6 +183,9 @@ impl Decoder { self.last_max_update = size; } + let span = tracing::trace_span!("hpack::decode"); + let _e = span.enter(); + tracing::trace!("decode"); while let Some(ty) = peek_u8(src) { @@ -191,14 +194,14 @@ impl Decoder { // determined from the first byte. match Representation::load(ty)? { Indexed => { - tracing::trace!(" Indexed; rem={:?}", src.remaining()); + tracing::trace!(rem = src.remaining(), kind = %"Indexed"); can_resize = false; let entry = self.decode_indexed(src)?; consume(src); f(entry); } LiteralWithIndexing => { - tracing::trace!(" LiteralWithIndexing; rem={:?}", src.remaining()); + tracing::trace!(rem = src.remaining(), kind = %"LiteralWithIndexing"); can_resize = false; let entry = self.decode_literal(src, true)?; @@ -209,14 +212,14 @@ impl Decoder { f(entry); } LiteralWithoutIndexing => { - tracing::trace!(" LiteralWithoutIndexing; rem={:?}", src.remaining()); + tracing::trace!(rem = src.remaining(), kind = %"LiteralWithoutIndexing"); can_resize = false; let entry = self.decode_literal(src, false)?; consume(src); f(entry); } LiteralNeverIndexed => { - tracing::trace!(" LiteralNeverIndexed; rem={:?}", src.remaining()); + tracing::trace!(rem = src.remaining(), kind = %"LiteralNeverIndexed"); can_resize = false; let entry = self.decode_literal(src, false)?; consume(src); @@ -226,7 +229,7 @@ impl Decoder { f(entry); } SizeUpdate => { - tracing::trace!(" SizeUpdate; rem={:?}", src.remaining()); + tracing::trace!(rem = src.remaining(), kind = %"SizeUpdate"); if !can_resize { return Err(DecoderError::InvalidMaxDynamicSize); } @@ -249,9 +252,9 @@ impl Decoder { } tracing::debug!( - "Decoder changed max table size from {} to {}", - self.table.size(), - new_size + from = self.table.size(), + to = new_size, + "Decoder changed max table size" ); self.table.set_max_size(new_size); @@ -302,11 +305,7 @@ impl Decoder { let len = decode_int(buf, 7)?; if len > buf.remaining() { - tracing::trace!( - "decode_string underflow; len={}; remaining={}", - len, - buf.remaining() - ); + tracing::trace!(len, remaining = buf.remaining(), "decode_string underflow",); return Err(DecoderError::NeedMore(NeedMore::StringUnderflow)); } diff --git a/src/hpack/encoder.rs b/src/hpack/encoder.rs index ef17748..e6881dd 100644 --- a/src/hpack/encoder.rs +++ b/src/hpack/encoder.rs @@ -86,7 +86,11 @@ impl Encoder { where I: Iterator>>, { + let span = tracing::trace_span!("hpack::encode"); + let _e = span.enter(); + let pos = position(dst); + tracing::trace!(pos, "encoding at"); if let Err(e) = self.encode_size_updates(dst) { if e == EncoderError::BufferOverflow { diff --git a/src/hpack/table.rs b/src/hpack/table.rs index e7c8ce7..2328743 100644 --- a/src/hpack/table.rs +++ b/src/hpack/table.rs @@ -597,7 +597,7 @@ impl Table { } assert!(dist <= their_dist, - "could not find entry; actual={}; desired={};" + + "could not find entry; actual={}; desired={}" + "probe={}, dist={}; their_dist={}; index={}; msg={}", actual, desired, probe, dist, their_dist, index.wrapping_sub(self.inserted), msg); diff --git a/src/proto/connection.rs b/src/proto/connection.rs index c9e33f4..ffa2945 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -44,6 +44,9 @@ where /// Stream state handler streams: Streams, + /// A `tracing` span tracking the lifetime of the connection. + span: tracing::Span, + /// Client or server _phantom: PhantomData

, } @@ -100,6 +103,7 @@ where ping_pong: PingPong::new(), settings: Settings::new(config.settings), streams, + span: tracing::debug_span!("Connection", peer = %P::NAME), _phantom: PhantomData, } } @@ -121,6 +125,9 @@ where /// Returns `RecvError` as this may raise errors that are caused by delayed /// processing of received frames. fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + let _e = self.span.enter(); + let span = tracing::trace_span!("poll_ready"); + let _e = span.enter(); // The order of these calls don't really matter too much ready!(self.ping_pong.send_pending_pong(cx, &mut self.codec))?; ready!(self.ping_pong.send_pending_ping(cx, &mut self.codec))?; @@ -200,9 +207,18 @@ where /// Advances the internal state of the connection. pub fn poll(&mut self, cx: &mut Context) -> Poll> { + // XXX(eliza): cloning the span is unfortunately necessary here in + // order to placate the borrow checker — `self` is mutably borrowed by + // `poll2`, which means that we can't borrow `self.span` to enter it. + // The clone is just an atomic ref bump. + let span = self.span.clone(); + let _e = span.enter(); + let span = tracing::trace_span!("poll"); + let _e = span.enter(); use crate::codec::RecvError::*; loop { + tracing::trace!(connection.state = ?self.state); // TODO: probably clean up this glob of code match self.state { // When open, continue to poll a frame @@ -230,7 +246,7 @@ where // error. This is handled by setting a GOAWAY frame followed by // terminating the connection. Poll::Ready(Err(Connection(e))) => { - tracing::debug!("Connection::poll; connection error={:?}", e); + tracing::debug!(error = ?e, "Connection::poll; connection error"); // We may have already sent a GOAWAY for this error, // if so, don't send another, just flush and close up. @@ -250,7 +266,7 @@ where // This is handled by resetting the frame then trying to read // another frame. Poll::Ready(Err(Stream { id, reason })) => { - tracing::trace!("stream error; id={:?}; reason={:?}", id, reason); + tracing::trace!(?id, ?reason, "stream error"); self.streams.send_reset(id, reason); } // Attempting to read a frame resulted in an I/O error. All @@ -258,7 +274,7 @@ where // // TODO: Are I/O errors recoverable? Poll::Ready(Err(Io(e))) => { - tracing::debug!("Connection::poll; IO error={:?}", e); + tracing::debug!(error = ?e, "Connection::poll; IO error"); let e = e.into(); // Reset all active streams @@ -317,28 +333,28 @@ where match ready!(Pin::new(&mut self.codec).poll_next(cx)?) { Some(Headers(frame)) => { - tracing::trace!("recv HEADERS; frame={:?}", frame); + tracing::trace!(?frame, "recv HEADERS"); self.streams.recv_headers(frame)?; } Some(Data(frame)) => { - tracing::trace!("recv DATA; frame={:?}", frame); + tracing::trace!(?frame, "recv DATA"); self.streams.recv_data(frame)?; } Some(Reset(frame)) => { - tracing::trace!("recv RST_STREAM; frame={:?}", frame); + tracing::trace!(?frame, "recv RST_STREAM"); self.streams.recv_reset(frame)?; } Some(PushPromise(frame)) => { - tracing::trace!("recv PUSH_PROMISE; frame={:?}", frame); + tracing::trace!(?frame, "recv PUSH_PROMISE"); self.streams.recv_push_promise(frame)?; } Some(Settings(frame)) => { - tracing::trace!("recv SETTINGS; frame={:?}", frame); + tracing::trace!(?frame, "recv SETTINGS"); self.settings .recv_settings(frame, &mut self.codec, &mut self.streams)?; } Some(GoAway(frame)) => { - tracing::trace!("recv GOAWAY; frame={:?}", frame); + tracing::trace!(?frame, "recv GOAWAY"); // This should prevent starting new streams, // but should allow continuing to process current streams // until they are all EOS. Once they are, State should @@ -347,7 +363,7 @@ where self.error = Some(frame.reason()); } Some(Ping(frame)) => { - tracing::trace!("recv PING; frame={:?}", frame); + tracing::trace!(?frame, "recv PING"); let status = self.ping_pong.recv_ping(frame); if status.is_shutdown() { assert!( @@ -360,11 +376,11 @@ where } } Some(WindowUpdate(frame)) => { - tracing::trace!("recv WINDOW_UPDATE; frame={:?}", frame); + tracing::trace!(?frame, "recv WINDOW_UPDATE"); self.streams.recv_window_update(frame)?; } Some(Priority(frame)) => { - tracing::trace!("recv PRIORITY; frame={:?}", frame); + tracing::trace!(?frame, "recv PRIORITY"); // TODO: handle } None => { diff --git a/src/proto/peer.rs b/src/proto/peer.rs index 8d327fb..3bcc772 100644 --- a/src/proto/peer.rs +++ b/src/proto/peer.rs @@ -11,6 +11,7 @@ use std::fmt; pub(crate) trait Peer { /// Message type polled from the transport type Poll: fmt::Debug; + const NAME: &'static str; fn r#dyn() -> Dyn; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 180d936..9379820 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -104,6 +104,8 @@ impl Prioritize { stream: &mut store::Ptr, task: &mut Option, ) { + let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); + let _e = span.enter(); // Queue the frame in the buffer stream.pending_send.push_back(buffer, frame); self.schedule_send(stream, task); @@ -112,7 +114,7 @@ impl Prioritize { pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option) { // If the stream is waiting to be opened, nothing more to do. if stream.is_send_ready() { - tracing::trace!("schedule_send; {:?}", stream.id); + tracing::trace!(?stream.id, "schedule_send"); // Queue the stream self.pending_send.push(stream); @@ -158,12 +160,10 @@ impl Prioritize { // Update the buffered data counter stream.buffered_send_data += sz; - tracing::trace!( - "send_data; sz={}; buffered={}; requested={}", - sz, - stream.buffered_send_data, - stream.requested_send_capacity - ); + let span = + tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity); + let _e = span.enter(); + tracing::trace!(buffered = stream.buffered_send_data); // Implicitly request more send capacity if not enough has been // requested yet. @@ -180,9 +180,8 @@ impl Prioritize { } tracing::trace!( - "send_data (2); available={}; buffered={}", - stream.send_flow.available(), - stream.buffered_send_data + available = %stream.send_flow.available(), + buffered = stream.buffered_send_data, ); // The `stream.buffered_send_data == 0` check is here so that, if a zero @@ -214,13 +213,14 @@ impl Prioritize { stream: &mut store::Ptr, counts: &mut Counts, ) { - tracing::trace!( - "reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}", - stream.id, - capacity, - capacity + stream.buffered_send_data, - stream.requested_send_capacity + let span = tracing::trace_span!( + "reserve_capacity", + ?stream.id, + requested = capacity, + effective = capacity + stream.buffered_send_data, + curr = stream.requested_send_capacity ); + let _e = span.enter(); // Actual capacity is `capacity` + the current amount of buffered data. // If it were less, then we could never send out the buffered data. @@ -266,13 +266,14 @@ impl Prioritize { inc: WindowSize, stream: &mut store::Ptr, ) -> Result<(), Reason> { - tracing::trace!( - "recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}", - stream.id, - stream.state, + let span = tracing::trace_span!( + "recv_stream_window_update", + ?stream.id, + ?stream.state, inc, - stream.send_flow + flow = ?stream.send_flow ); + let _e = span.enter(); if stream.state.is_send_closed() && stream.buffered_send_data == 0 { // We can't send any data, so don't bother doing anything else. @@ -324,9 +325,11 @@ impl Prioritize { } pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) { + let span = tracing::trace_span!("clear_pending_capacity"); + let _e = span.enter(); while let Some(stream) = self.pending_capacity.pop(store) { counts.transition(stream, |_, stream| { - tracing::trace!("clear_pending_capacity; stream={:?}", stream.id); + tracing::trace!(?stream.id, "clear_pending_capacity"); }) } } @@ -339,7 +342,8 @@ impl Prioritize { ) where R: Resolve, { - tracing::trace!("assign_connection_capacity; inc={}", inc); + let span = tracing::trace_span!("assign_connection_capacity", inc); + let _e = span.enter(); self.flow.assign_capacity(inc); @@ -382,15 +386,14 @@ impl Prioritize { // Can't assign more than what is available stream.send_flow.window_size() - stream.send_flow.available().as_size(), ); - + let span = tracing::trace_span!("try_assign_capacity", ?stream.id); + let _e = span.enter(); tracing::trace!( - "try_assign_capacity; stream={:?}, requested={}; additional={}; buffered={}; window={}; conn={}", - stream.id, - total_requested, + requested = total_requested, additional, - stream.buffered_send_data, - stream.send_flow.window_size(), - self.flow.available() + buffered = stream.buffered_send_data, + window = stream.send_flow.window_size(), + conn = %self.flow.available() ); if additional == 0 { @@ -416,7 +419,7 @@ impl Prioritize { // TODO: Should prioritization factor into this? let assign = cmp::min(conn_available, additional); - tracing::trace!(" assigning; stream={:?}, capacity={}", stream.id, assign,); + tracing::trace!(capacity = assign, "assigning"); // Assign the capacity to the stream stream.assign_capacity(assign); @@ -426,11 +429,10 @@ impl Prioritize { } tracing::trace!( - "try_assign_capacity(2); available={}; requested={}; buffered={}; has_unavailable={:?}", - stream.send_flow.available(), - stream.requested_send_capacity, - stream.buffered_send_data, - stream.send_flow.has_unavailable() + available = %stream.send_flow.available(), + requested = stream.requested_send_capacity, + buffered = stream.buffered_send_data, + has_unavailable = %stream.send_flow.has_unavailable() ); if stream.send_flow.available() < stream.requested_send_capacity @@ -492,7 +494,7 @@ impl Prioritize { match self.pop_frame(buffer, store, max_frame_len, counts) { Some(frame) => { - tracing::trace!("writing frame={:?}", frame); + tracing::trace!(?frame, "writing"); debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); if let Frame::Data(ref frame) = frame { @@ -538,14 +540,15 @@ impl Prioritize { where B: Buf, { - tracing::trace!("try reclaim frame"); + let span = tracing::trace_span!("try_reclaim_frame"); + let _e = span.enter(); // First check if there are any data chunks to take back if let Some(frame) = dst.take_last_data_frame() { tracing::trace!( - " -> reclaimed; frame={:?}; sz={}", - frame, - frame.payload().inner.get_ref().remaining() + ?frame, + sz = frame.payload().inner.get_ref().remaining(), + "reclaimed" ); let mut eos = false; @@ -603,11 +606,12 @@ impl Prioritize { } pub fn clear_queue(&mut self, buffer: &mut Buffer>, stream: &mut store::Ptr) { - tracing::trace!("clear_queue; stream={:?}", stream.id); + let span = tracing::trace_span!("clear_queue", ?stream.id); + let _e = span.enter(); // TODO: make this more efficient? while let Some(frame) = stream.pending_send.pop_front(buffer) { - tracing::trace!("dropping; frame={:?}", frame); + tracing::trace!(?frame, "dropping"); } stream.buffered_send_data = 0; @@ -644,16 +648,14 @@ impl Prioritize { where B: Buf, { - tracing::trace!("pop_frame"); + let span = tracing::trace_span!("pop_frame"); + let _e = span.enter(); loop { match self.pending_send.pop(store) { Some(mut stream) => { - tracing::trace!( - "pop_frame; stream={:?}; stream.state={:?}", - stream.id, - stream.state - ); + let span = tracing::trace_span!("popped", ?stream.id, ?stream.state); + let _e = span.enter(); // It's possible that this stream, besides having data to send, // is also queued to send a reset, and thus is already in the queue @@ -662,11 +664,7 @@ impl Prioritize { // To be safe, we just always ask the stream. let is_pending_reset = stream.is_pending_reset_expiration(); - tracing::trace!( - " --> stream={:?}; is_pending_reset={:?};", - stream.id, - is_pending_reset - ); + tracing::trace!(is_pending_reset); let frame = match stream.pending_send.pop_front(buffer) { Some(Frame::Data(mut frame)) => { @@ -676,24 +674,19 @@ impl Prioritize { let sz = frame.payload().remaining(); tracing::trace!( - " --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \ - available={}; requested={}; buffered={};", - frame.stream_id(), sz, - frame.is_end_stream(), - stream_capacity, - stream.send_flow.available(), - stream.requested_send_capacity, - stream.buffered_send_data, + eos = frame.is_end_stream(), + window = %stream_capacity, + available = %stream.send_flow.available(), + requested = stream.requested_send_capacity, + buffered = stream.buffered_send_data, + "data frame" ); // Zero length data frames always have capacity to // be sent. if sz > 0 && stream_capacity == 0 { - tracing::trace!( - " --> stream capacity is 0; requested={}", - stream.requested_send_capacity - ); + tracing::trace!("stream capacity is 0"); // Ensure that the stream is waiting for // connection level capacity @@ -721,34 +714,38 @@ impl Prioritize { // capacity at this point. debug_assert!(len <= self.flow.window_size()); - tracing::trace!(" --> sending data frame; len={}", len); + tracing::trace!(len, "sending data frame"); // Update the flow control - tracing::trace!(" -- updating stream flow --"); - stream.send_flow.send_data(len); + tracing::trace_span!("updating stream flow").in_scope(|| { + stream.send_flow.send_data(len); - // Decrement the stream's buffered data counter - debug_assert!(stream.buffered_send_data >= len); - stream.buffered_send_data -= len; - stream.requested_send_capacity -= len; + // Decrement the stream's buffered data counter + debug_assert!(stream.buffered_send_data >= len); + stream.buffered_send_data -= len; + stream.requested_send_capacity -= len; - // Assign the capacity back to the connection that - // was just consumed from the stream in the previous - // line. - self.flow.assign_capacity(len); + // Assign the capacity back to the connection that + // was just consumed from the stream in the previous + // line. + self.flow.assign_capacity(len); + }); - tracing::trace!(" -- updating connection flow --"); - self.flow.send_data(len); + let (eos, len) = tracing::trace_span!("updating connection flow") + .in_scope(|| { + self.flow.send_data(len); - // Wrap the frame's data payload to ensure that the - // correct amount of data gets written. + // Wrap the frame's data payload to ensure that the + // correct amount of data gets written. - let eos = frame.is_end_stream(); - let len = len as usize; + let eos = frame.is_end_stream(); + let len = len as usize; - if frame.payload().remaining() > len { - frame.set_end_stream(false); - } + if frame.payload().remaining() > len { + frame.set_end_stream(false); + } + (eos, len) + }); Frame::Data(frame.map(|buf| Prioritized { inner: buf.take(len), diff --git a/src/server.rs b/src/server.rs index 69ba16a..3c093f7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -128,6 +128,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use std::{convert, fmt, io, mem}; use tokio::io::{AsyncRead, AsyncWrite}; +use tracing_futures::{Instrument, Instrumented}; /// In progress HTTP/2.0 connection handshake future. /// @@ -149,6 +150,8 @@ pub struct Handshake { builder: Builder, /// The current state of the handshake. state: Handshaking, + /// Span tracking the handshake + span: tracing::Span, } /// Accepts inbound HTTP/2.0 streams on a connection. @@ -290,9 +293,9 @@ impl fmt::Debug for SendPushedResponse { /// Stages of an in-progress handshake. enum Handshaking { /// State 1. Connection is flushing pending SETTINGS frame. - Flushing(Flush>), + Flushing(Instrumented>>), /// State 2. Connection is waiting for the client preface. - ReadingPreface(ReadPreface>), + ReadingPreface(Instrumented>>), /// Dummy state for `mem::replace`. Empty, } @@ -359,6 +362,9 @@ where B: Buf + 'static, { fn handshake2(io: T, builder: Builder) -> Handshake { + let span = tracing::trace_span!("server_handshake", io = %std::any::type_name::()); + let entered = span.enter(); + // Create the codec. let mut codec = Codec::new(io); @@ -378,7 +384,13 @@ where // Create the handshake future. let state = Handshaking::from(codec); - Handshake { builder, state } + drop(entered); + + Handshake { + builder, + state, + span, + } } /// Accept the next incoming request on this connection. @@ -1179,7 +1191,9 @@ where type Output = Result, crate::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - tracing::trace!("Handshake::poll(); state={:?};", self.state); + let span = self.span.clone(); // XXX(eliza): T_T + let _e = span.enter(); + tracing::trace!(state = ?self.state); use crate::server::Handshaking::*; self.state = if let Flushing(ref mut flush) = self.state { @@ -1188,11 +1202,11 @@ where // for the client preface. let codec = match Pin::new(flush).poll(cx)? { Poll::Pending => { - tracing::trace!("Handshake::poll(); flush.poll()=Pending"); + tracing::trace!(flush.poll = %"Pending"); return Poll::Pending; } Poll::Ready(flushed) => { - tracing::trace!("Handshake::poll(); flush.poll()=Ready"); + tracing::trace!(flush.poll = %"Ready"); flushed } }; @@ -1229,7 +1243,7 @@ where }, ); - tracing::trace!("Handshake::poll(); connection established!"); + tracing::trace!("connection established!"); let mut c = Connection { connection }; if let Some(sz) = self.builder.initial_target_connection_window_size { c.set_target_window_size(sz); @@ -1290,14 +1304,14 @@ impl Peer { use PushPromiseHeaderError::*; match e { NotSafeAndCacheable => tracing::debug!( - "convert_push_message: method {} is not safe and cacheable; promised_id={:?}", + ?promised_id, + "convert_push_message: method {} is not safe and cacheable", request.method(), - promised_id, ), InvalidContentLength(e) => tracing::debug!( - "convert_push_message; promised request has invalid content-length {:?}; promised_id={:?}", + ?promised_id, + "convert_push_message; promised request has invalid content-length {:?}", e, - promised_id, ), } return Err(UserError::MalformedHeaders); @@ -1328,6 +1342,8 @@ impl Peer { impl proto::Peer for Peer { type Poll = Request<()>; + const NAME: &'static str = "Server"; + fn is_server() -> bool { true } @@ -1471,7 +1487,7 @@ where { #[inline] fn from(flush: Flush>) -> Self { - Handshaking::Flushing(flush) + Handshaking::Flushing(flush.instrument(tracing::trace_span!("flush"))) } } @@ -1482,7 +1498,7 @@ where { #[inline] fn from(read: ReadPreface>) -> Self { - Handshaking::ReadingPreface(read) + Handshaking::ReadingPreface(read.instrument(tracing::trace_span!("read_preface"))) } } diff --git a/tests/h2-support/Cargo.toml b/tests/h2-support/Cargo.toml index b48dc36..c4e68b1 100644 --- a/tests/h2-support/Cargo.toml +++ b/tests/h2-support/Cargo.toml @@ -8,7 +8,8 @@ edition = "2018" h2 = { path = "../..", features = ["stream", "unstable"] } bytes = "0.5" -env_logger = "0.5.9" +tracing = "0.1" +tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "chrono", "ansi"] } futures = { version = "0.3", default-features = false } http = "0.2" tokio = { version = "0.2", features = ["time"] } diff --git a/tests/h2-support/src/lib.rs b/tests/h2-support/src/lib.rs index d88f6ca..3c13c0a 100644 --- a/tests/h2-support/src/lib.rs +++ b/tests/h2-support/src/lib.rs @@ -8,6 +8,7 @@ pub mod raw; pub mod frames; pub mod mock; pub mod prelude; +pub mod trace; pub mod util; mod client_ext; @@ -24,3 +25,19 @@ pub type Codec = h2::Codec; // This is the frame type that is sent pub type SendFrame = h2::frame::Frame; + +#[macro_export] +macro_rules! trace_init { + () => { + let _guard = $crate::trace::init(); + let span = $crate::prelude::tracing::info_span!( + "test", + "{}", + // get the name of the test thread to generate a unique span for the test + std::thread::current() + .name() + .expect("test threads must be named") + ); + let _e = span.enter(); + }; +} diff --git a/tests/h2-support/src/prelude.rs b/tests/h2-support/src/prelude.rs index 2e95b68..dafdd29 100644 --- a/tests/h2-support/src/prelude.rs +++ b/tests/h2-support/src/prelude.rs @@ -28,7 +28,7 @@ pub use super::assert::assert_frame_eq; // Re-export useful crates pub use tokio_test::io as mock_io; -pub use {bytes, env_logger, futures, http, tokio::io as tokio_io}; +pub use {bytes, futures, http, tokio::io as tokio_io, tracing, tracing_subscriber}; // Re-export primary future types pub use futures::{Future, Sink, Stream}; diff --git a/tests/h2-support/src/trace.rs b/tests/h2-support/src/trace.rs new file mode 100644 index 0000000..4ac1174 --- /dev/null +++ b/tests/h2-support/src/trace.rs @@ -0,0 +1,41 @@ +use std::{io, str}; +pub use tracing; +pub use tracing_subscriber; + +pub fn init() -> tracing::dispatcher::DefaultGuard { + tracing::subscriber::set_default( + tracing_subscriber::fmt() + .with_max_level(tracing::Level::TRACE) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) + .with_writer(PrintlnWriter { _p: () }) + .finish(), + ) +} + +struct PrintlnWriter { + _p: (), +} + +impl tracing_subscriber::fmt::MakeWriter for PrintlnWriter { + type Writer = PrintlnWriter; + fn make_writer(&self) -> Self::Writer { + PrintlnWriter { _p: () } + } +} + +impl io::Write for PrintlnWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let s = str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + println!("{}", s); + Ok(s.len()) + } + + fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> io::Result<()> { + println!("{}", fmt); + Ok(()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 8175ce4..41f8f64 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -7,7 +7,7 @@ use std::task::Context; #[tokio::test] async fn handshake() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let mock = mock_io::Builder::new() .handshake() @@ -24,7 +24,7 @@ async fn handshake() { #[tokio::test] async fn client_other_thread() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -60,7 +60,7 @@ async fn client_other_thread() { #[tokio::test] async fn recv_invalid_server_stream_id() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let mock = mock_io::Builder::new() .handshake() @@ -96,7 +96,7 @@ async fn recv_invalid_server_stream_id() { #[tokio::test] async fn request_stream_id_overflows() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let h2 = async move { @@ -149,7 +149,7 @@ async fn request_stream_id_overflows() { #[tokio::test] async fn client_builder_max_concurrent_streams() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let mut settings = frame::Settings::default(); @@ -187,7 +187,7 @@ async fn client_builder_max_concurrent_streams() { #[tokio::test] async fn request_over_max_concurrent_streams_errors() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -286,7 +286,7 @@ async fn request_over_max_concurrent_streams_errors() { #[tokio::test] async fn send_request_poll_ready_when_connection_error() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -379,7 +379,7 @@ async fn send_request_poll_ready_when_connection_error() { #[tokio::test] async fn send_reset_notifies_recv_stream() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -432,7 +432,7 @@ async fn send_reset_notifies_recv_stream() { #[tokio::test] async fn http_11_request_without_scheme_or_authority() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -462,7 +462,7 @@ async fn http_11_request_without_scheme_or_authority() { #[tokio::test] async fn http_2_request_without_scheme_or_authority() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -499,7 +499,7 @@ fn request_with_h1_version() {} #[tokio::test] async fn request_with_connection_headers() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); // can't assert full handshake, since client never sends a request, and @@ -542,7 +542,7 @@ async fn request_with_connection_headers() { #[tokio::test] async fn connection_close_notifies_response_future() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { let settings = srv.assert_client_handshake().await; @@ -581,7 +581,7 @@ async fn connection_close_notifies_response_future() { #[tokio::test] async fn connection_close_notifies_client_poll_ready() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -626,7 +626,7 @@ async fn connection_close_notifies_client_poll_ready() { #[tokio::test] async fn sending_request_on_closed_connection() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -688,7 +688,7 @@ async fn sending_request_on_closed_connection() { #[tokio::test] async fn recv_too_big_headers() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -751,7 +751,7 @@ async fn recv_too_big_headers() { #[tokio::test] async fn pending_send_request_gets_reset_by_peer_properly() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let payload = Bytes::from(vec![0; (frame::DEFAULT_INITIAL_WINDOW_SIZE * 2) as usize]); @@ -823,7 +823,7 @@ async fn pending_send_request_gets_reset_by_peer_properly() { #[tokio::test] async fn request_without_path() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -854,7 +854,7 @@ async fn request_without_path() { #[tokio::test] async fn request_options_with_star() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); // Note the lack of trailing slash. @@ -899,7 +899,7 @@ async fn notify_on_send_capacity() { // stream, the client is notified. use tokio::sync::oneshot; - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let (done_tx, done_rx) = oneshot::channel(); @@ -979,7 +979,7 @@ async fn notify_on_send_capacity() { #[tokio::test] async fn send_stream_poll_reset() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -1017,7 +1017,7 @@ async fn drop_pending_open() { // This test checks that a stream queued for pending open behaves correctly when its // client drops. use tokio::sync::oneshot; - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let (init_tx, init_rx) = oneshot::channel(); @@ -1105,7 +1105,7 @@ async fn malformed_response_headers_dont_unlink_stream() { // no remaining references correctly resets the stream, without prematurely // unlinking it. use tokio::sync::oneshot; - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let (drop_tx, drop_rx) = oneshot::channel(); diff --git a/tests/h2-tests/tests/codec_read.rs b/tests/h2-tests/tests/codec_read.rs index 6ebe54d..95e895d 100644 --- a/tests/h2-tests/tests/codec_read.rs +++ b/tests/h2-tests/tests/codec_read.rs @@ -130,7 +130,7 @@ fn read_headers_empty_payload() {} #[tokio::test] async fn read_continuation_frames() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let large = build_large_headers(); @@ -191,7 +191,7 @@ async fn read_continuation_frames() { async fn update_max_frame_len_at_rest() { use futures::StreamExt; - let _ = env_logger::try_init(); + h2_support::trace_init!(); // TODO: add test for updating max frame length in flight as well? let mut codec = raw_codec! { read => [ diff --git a/tests/h2-tests/tests/codec_write.rs b/tests/h2-tests/tests/codec_write.rs index 2347f63..0b85a22 100644 --- a/tests/h2-tests/tests/codec_write.rs +++ b/tests/h2-tests/tests/codec_write.rs @@ -5,7 +5,7 @@ use h2_support::prelude::*; async fn write_continuation_frames() { // An invalid dependency ID results in a stream level error. The hpack // payload should still be decoded. - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let large = build_large_headers(); @@ -56,7 +56,7 @@ async fn write_continuation_frames() { async fn client_settings_header_table_size() { // A server sets the SETTINGS_HEADER_TABLE_SIZE to 0, test that the // client doesn't send indexed headers. - let _ = env_logger::try_init(); + h2_support::trace_init!(); let io = mock_io::Builder::new() // Read SETTINGS_HEADER_TABLE_SIZE = 0 @@ -99,7 +99,7 @@ async fn client_settings_header_table_size() { async fn server_settings_header_table_size() { // A client sets the SETTINGS_HEADER_TABLE_SIZE to 0, test that the // server doesn't send indexed headers. - let _ = env_logger::try_init(); + h2_support::trace_init!(); let io = mock_io::Builder::new() .read(MAGIC_PREFACE) diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 3ca65ac..4b6fe7a 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -7,7 +7,7 @@ use h2_support::util::yield_once; // explicitly requested. #[tokio::test] async fn send_data_without_requesting_capacity() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let payload = vec![0; 1024]; @@ -53,7 +53,7 @@ async fn send_data_without_requesting_capacity() { #[tokio::test] async fn release_capacity_sends_window_update() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let payload = vec![0u8; 16_384]; let payload_len = payload.len(); @@ -120,7 +120,7 @@ async fn release_capacity_sends_window_update() { #[tokio::test] async fn release_capacity_of_small_amount_does_not_send_window_update() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let payload = [0; 16]; @@ -175,7 +175,7 @@ fn expand_window_calls_are_coalesced() {} #[tokio::test] async fn recv_data_overflows_connection_window() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); @@ -238,7 +238,7 @@ async fn recv_data_overflows_connection_window() { #[tokio::test] async fn recv_data_overflows_stream_window() { // this tests for when streams have smaller windows than their connection - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); @@ -295,7 +295,7 @@ fn recv_window_update_causes_overflow() { #[tokio::test] async fn stream_error_release_connection_capacity() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -371,7 +371,7 @@ async fn stream_error_release_connection_capacity() { #[tokio::test] async fn stream_close_by_data_frame_releases_capacity() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE as usize; @@ -443,7 +443,7 @@ async fn stream_close_by_data_frame_releases_capacity() { #[tokio::test] async fn stream_close_by_trailers_frame_releases_capacity() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE as usize; @@ -516,7 +516,7 @@ async fn stream_close_by_trailers_frame_releases_capacity() { #[tokio::test] async fn stream_close_by_send_reset_frame_releases_capacity() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -575,7 +575,7 @@ fn stream_close_by_recv_reset_frame_releases_capacity() {} #[tokio::test] async fn recv_window_update_on_stream_closed_by_data_frame() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let h2 = async move { @@ -620,7 +620,7 @@ async fn recv_window_update_on_stream_closed_by_data_frame() { #[tokio::test] async fn reserved_capacity_assigned_in_multi_window_updates() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let h2 = async move { @@ -685,7 +685,7 @@ async fn reserved_capacity_assigned_in_multi_window_updates() { async fn connection_notified_on_released_capacity() { use tokio::sync::{mpsc, oneshot}; - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); // We're going to run the connection on a thread in order to isolate task @@ -794,7 +794,7 @@ async fn connection_notified_on_released_capacity() { #[tokio::test] async fn recv_settings_removes_available_capacity() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let mut settings = frame::Settings::default(); @@ -841,7 +841,7 @@ async fn recv_settings_removes_available_capacity() { #[tokio::test] async fn recv_settings_keeps_assigned_capacity() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let (sent_settings, sent_settings_rx) = futures::channel::oneshot::channel(); @@ -886,7 +886,7 @@ async fn recv_settings_keeps_assigned_capacity() { #[tokio::test] async fn recv_no_init_window_then_receive_some_init_window() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let mut settings = frame::Settings::default(); @@ -942,7 +942,7 @@ async fn settings_lowered_capacity_returns_capacity_to_connection() { use futures::channel::oneshot; use futures::future::{select, Either}; - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); @@ -1049,7 +1049,7 @@ async fn settings_lowered_capacity_returns_capacity_to_connection() { #[tokio::test] async fn client_increase_target_window_size() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -1069,7 +1069,7 @@ async fn client_increase_target_window_size() { #[tokio::test] async fn increase_target_window_size_after_using_some() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -1110,7 +1110,7 @@ async fn increase_target_window_size_after_using_some() { #[tokio::test] async fn decrease_target_window_size() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -1155,7 +1155,7 @@ async fn decrease_target_window_size() { #[tokio::test] async fn client_update_initial_window_size() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE * 2; @@ -1230,7 +1230,7 @@ async fn client_update_initial_window_size() { #[tokio::test] async fn client_decrease_initial_window_size() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -1355,7 +1355,7 @@ async fn client_decrease_initial_window_size() { #[tokio::test] async fn server_target_window_size() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -1377,7 +1377,7 @@ async fn server_target_window_size() { #[tokio::test] async fn recv_settings_increase_window_size_after_using_some() { // See https://github.com/hyperium/h2/issues/208 - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let new_win_size = 16_384 * 4; // 1 bigger than default @@ -1419,7 +1419,7 @@ async fn recv_settings_increase_window_size_after_using_some() { #[tokio::test] async fn reserve_capacity_after_peer_closes() { // See https://github.com/hyperium/h2/issues/300 - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -1456,7 +1456,7 @@ async fn reserve_capacity_after_peer_closes() { async fn reset_stream_waiting_for_capacity() { // This tests that receiving a reset on a stream that has some available // connection-level window reassigns that window to another stream. - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); @@ -1517,7 +1517,7 @@ async fn reset_stream_waiting_for_capacity() { #[tokio::test] async fn data_padding() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let mut body = Vec::new(); diff --git a/tests/h2-tests/tests/ping_pong.rs b/tests/h2-tests/tests/ping_pong.rs index f093b43..a57f35c 100644 --- a/tests/h2-tests/tests/ping_pong.rs +++ b/tests/h2-tests/tests/ping_pong.rs @@ -6,7 +6,7 @@ use h2_support::prelude::*; #[tokio::test] async fn recv_single_ping() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (m, mut mock) = mock::new(); // Create the handshake @@ -36,7 +36,7 @@ async fn recv_single_ping() { #[tokio::test] async fn recv_multiple_pings() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -58,7 +58,7 @@ async fn recv_multiple_pings() { #[tokio::test] async fn pong_has_highest_priority() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let data = Bytes::from(vec![0; 16_384]); @@ -96,7 +96,7 @@ async fn pong_has_highest_priority() { #[tokio::test] async fn user_ping_pong() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -138,7 +138,7 @@ async fn user_ping_pong() { #[tokio::test] async fn user_notifies_when_connection_closes() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { let settings = srv.assert_client_handshake().await; diff --git a/tests/h2-tests/tests/prioritization.rs b/tests/h2-tests/tests/prioritization.rs index 18084d9..7c26810 100644 --- a/tests/h2-tests/tests/prioritization.rs +++ b/tests/h2-tests/tests/prioritization.rs @@ -6,7 +6,7 @@ use std::task::Context; #[tokio::test] async fn single_stream_send_large_body() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let payload = vec![0; 1024]; @@ -66,7 +66,7 @@ async fn single_stream_send_large_body() { #[tokio::test] async fn multiple_streams_with_payload_greater_than_default_window() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let payload = vec![0; 16384 * 5 - 1]; let payload_clone = payload.clone(); @@ -129,7 +129,7 @@ async fn multiple_streams_with_payload_greater_than_default_window() { #[tokio::test] async fn single_stream_send_extra_large_body_multi_frames_one_buffer() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let payload = vec![0; 32_768]; @@ -193,7 +193,7 @@ async fn single_stream_send_extra_large_body_multi_frames_one_buffer() { #[tokio::test] async fn single_stream_send_body_greater_than_default_window() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let payload = vec![0; 16384 * 5 - 1]; @@ -279,7 +279,7 @@ async fn single_stream_send_body_greater_than_default_window() { #[tokio::test] async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let payload = vec![0; 32_768]; @@ -341,7 +341,7 @@ async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { #[tokio::test] async fn send_data_receive_window_update() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (m, mut mock) = mock::new(); let h2 = async move { diff --git a/tests/h2-tests/tests/push_promise.rs b/tests/h2-tests/tests/push_promise.rs index f786a72..a5a7dfe 100644 --- a/tests/h2-tests/tests/push_promise.rs +++ b/tests/h2-tests/tests/push_promise.rs @@ -4,7 +4,7 @@ use h2_support::prelude::*; #[tokio::test] async fn recv_push_works() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let mock = async move { @@ -62,7 +62,7 @@ async fn recv_push_works() { #[tokio::test] async fn pushed_streams_arent_dropped_too_early() { // tests that by default, received push promises work - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let mock = async move { @@ -128,7 +128,7 @@ async fn pushed_streams_arent_dropped_too_early() { #[tokio::test] async fn recv_push_when_push_disabled_is_conn_error() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let mock = async move { @@ -186,7 +186,7 @@ async fn recv_push_when_push_disabled_is_conn_error() { #[tokio::test] async fn pending_push_promises_reset_when_dropped() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -233,7 +233,7 @@ async fn pending_push_promises_reset_when_dropped() { #[tokio::test] async fn recv_push_promise_over_max_header_list_size() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -284,7 +284,7 @@ async fn recv_push_promise_over_max_header_list_size() { #[tokio::test] async fn recv_invalid_push_promise_headers_is_stream_protocol_error() { // Unsafe method or content length is stream protocol error - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let mock = async move { @@ -348,7 +348,7 @@ fn recv_push_promise_with_wrong_authority_is_stream_error() { #[tokio::test] async fn recv_push_promise_skipped_stream_id() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let mock = async move { @@ -402,7 +402,7 @@ async fn recv_push_promise_skipped_stream_id() { #[tokio::test] async fn recv_push_promise_dup_stream_id() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let mock = async move { diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index 1916138..3a76491 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -10,7 +10,7 @@ const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; #[tokio::test] async fn read_preface_in_multiple_frames() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let mock = mock_io::Builder::new() .read(b"PRI * HTTP/2.0") @@ -28,7 +28,7 @@ async fn read_preface_in_multiple_frames() { #[tokio::test] async fn server_builder_set_max_concurrent_streams() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let mut settings = frame::Settings::default(); @@ -72,7 +72,7 @@ async fn server_builder_set_max_concurrent_streams() { #[tokio::test] async fn serve_request() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -107,7 +107,7 @@ async fn serve_request() { #[tokio::test] async fn serve_connect() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -138,7 +138,7 @@ async fn serve_connect() { #[tokio::test] async fn push_request() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -222,7 +222,7 @@ async fn push_request() { #[tokio::test] async fn push_request_against_concurrency() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -306,7 +306,7 @@ async fn push_request_against_concurrency() { #[tokio::test] async fn push_request_with_data() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -372,7 +372,7 @@ async fn push_request_with_data() { #[tokio::test] async fn push_request_between_data() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -443,7 +443,7 @@ fn accept_with_pending_connections_after_socket_close() {} #[tokio::test] async fn recv_invalid_authority() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let bad_auth = util::byte_str("not:a/good authority"); @@ -470,7 +470,7 @@ async fn recv_invalid_authority() { #[tokio::test] async fn recv_connection_header() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let req = |id, name, val| { @@ -507,7 +507,7 @@ async fn recv_connection_header() { #[tokio::test] async fn sends_reset_cancel_when_req_body_is_dropped() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -539,7 +539,7 @@ async fn sends_reset_cancel_when_req_body_is_dropped() { #[tokio::test] async fn abrupt_shutdown() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -583,7 +583,7 @@ async fn abrupt_shutdown() { #[tokio::test] async fn graceful_shutdown() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -658,7 +658,7 @@ async fn graceful_shutdown() { #[tokio::test] async fn goaway_even_if_client_sent_goaway() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -707,7 +707,7 @@ async fn goaway_even_if_client_sent_goaway() { #[tokio::test] async fn sends_reset_cancel_when_res_body_is_dropped() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -761,7 +761,7 @@ async fn sends_reset_cancel_when_res_body_is_dropped() { #[tokio::test] async fn too_big_headers_sends_431() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -797,7 +797,7 @@ async fn too_big_headers_sends_431() { #[tokio::test] async fn too_big_headers_sends_reset_after_431_if_not_eos() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -832,7 +832,7 @@ async fn too_big_headers_sends_reset_after_431_if_not_eos() { #[tokio::test] async fn poll_reset() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -872,7 +872,7 @@ async fn poll_reset() { #[tokio::test] async fn poll_reset_io_error() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -913,7 +913,7 @@ async fn poll_reset_io_error() { #[tokio::test] async fn poll_reset_after_send_response_is_user_error() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { @@ -967,7 +967,7 @@ async fn poll_reset_after_send_response_is_user_error() { #[tokio::test] async fn server_error_on_unclean_shutdown() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let srv = server::Builder::new().handshake::<_, Bytes>(io); @@ -980,7 +980,7 @@ async fn server_error_on_unclean_shutdown() { #[tokio::test] async fn request_without_authority() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let client = async move { diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index 2082d5f..16d1a75 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -9,7 +9,7 @@ use tokio::sync::oneshot; #[tokio::test] async fn send_recv_headers_only() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let mock = mock_io::Builder::new() .handshake() @@ -42,7 +42,7 @@ async fn send_recv_headers_only() { #[tokio::test] async fn send_recv_data() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let mock = mock_io::Builder::new() .handshake() @@ -104,7 +104,7 @@ async fn send_recv_data() { #[tokio::test] async fn send_headers_recv_data_single_frame() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let mock = mock_io::Builder::new() .handshake() @@ -153,7 +153,7 @@ async fn send_headers_recv_data_single_frame() { #[tokio::test] async fn closed_streams_are_released() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let h2 = async move { @@ -196,7 +196,7 @@ async fn closed_streams_are_released() { #[tokio::test] async fn errors_if_recv_frame_exceeds_max_frame_size() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let h2 = async move { @@ -239,7 +239,7 @@ async fn errors_if_recv_frame_exceeds_max_frame_size() { #[tokio::test] async fn configure_max_frame_size() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let h2 = async move { @@ -278,7 +278,7 @@ async fn configure_max_frame_size() { #[tokio::test] async fn recv_goaway_finishes_processed_streams() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -332,7 +332,7 @@ async fn recv_goaway_finishes_processed_streams() { #[tokio::test] async fn recv_goaway_with_higher_last_processed_id() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -366,7 +366,7 @@ async fn recv_goaway_with_higher_last_processed_id() { #[tokio::test] async fn recv_next_stream_id_updated_by_malformed_headers() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut client) = mock::new(); let bad_auth = util::byte_str("not:a/good authority"); @@ -404,7 +404,7 @@ async fn recv_next_stream_id_updated_by_malformed_headers() { #[tokio::test] async fn skipped_stream_ids_are_implicitly_closed() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -445,7 +445,7 @@ async fn skipped_stream_ids_are_implicitly_closed() { #[tokio::test] async fn send_rst_stream_allows_recv_data() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -490,7 +490,7 @@ async fn send_rst_stream_allows_recv_data() { #[tokio::test] async fn send_rst_stream_allows_recv_trailers() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -531,7 +531,7 @@ async fn send_rst_stream_allows_recv_trailers() { #[tokio::test] async fn rst_stream_expires() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -582,7 +582,7 @@ async fn rst_stream_expires() { #[tokio::test] async fn rst_stream_max() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -653,7 +653,7 @@ async fn rst_stream_max() { #[tokio::test] async fn reserved_state_recv_window_update() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { @@ -692,7 +692,7 @@ async fn reserved_state_recv_window_update() { /* #[test] fn send_data_after_headers_eos() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let mock = mock_io::Builder::new() .handshake() @@ -733,7 +733,7 @@ async fn rst_while_closing() { // Test to reproduce panic in issue #246 --- receipt of a RST_STREAM frame // on a stream in the Half Closed (remote) state with a queued EOS causes // a panic. - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); // Rendevous when we've queued a trailers frame @@ -794,7 +794,7 @@ async fn rst_with_buffered_data() { // the data is fully flushed. Given that resetting a stream requires // clearing all associated state for that stream, this test ensures that the // buffered up frame is correctly handled. - let _ = env_logger::try_init(); + h2_support::trace_init!(); // This allows the settings + headers frame through let (io, mut srv) = mock::new_with_write_capacity(73); @@ -846,7 +846,7 @@ async fn err_with_buffered_data() { // the data is fully flushed. Given that resetting a stream requires // clearing all associated state for that stream, this test ensures that the // buffered up frame is correctly handled. - let _ = env_logger::try_init(); + h2_support::trace_init!(); // This allows the settings + headers frame through let (io, mut srv) = mock::new_with_write_capacity(73); @@ -901,7 +901,7 @@ async fn send_err_with_buffered_data() { // the data is fully flushed. Given that resetting a stream requires // clearing all associated state for that stream, this test ensures that the // buffered up frame is correctly handled. - let _ = env_logger::try_init(); + h2_support::trace_init!(); // This allows the settings + headers frame through let (io, mut srv) = mock::new_with_write_capacity(73); @@ -963,7 +963,7 @@ async fn send_err_with_buffered_data() { #[tokio::test] async fn srv_window_update_on_lower_stream_id() { // See https://github.com/hyperium/h2/issues/208 - let _ = env_logger::try_init(); + h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { diff --git a/tests/h2-tests/tests/trailers.rs b/tests/h2-tests/tests/trailers.rs index 0786655..08a463a 100644 --- a/tests/h2-tests/tests/trailers.rs +++ b/tests/h2-tests/tests/trailers.rs @@ -3,7 +3,7 @@ use h2_support::prelude::*; #[tokio::test] async fn recv_trailers_only() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let mock = mock_io::Builder::new() .handshake() @@ -53,7 +53,7 @@ async fn recv_trailers_only() { #[tokio::test] async fn send_trailers_immediately() { - let _ = env_logger::try_init(); + h2_support::trace_init!(); let mock = mock_io::Builder::new() .handshake()