start adding tracing spans to internals (#478)

We've adopted `tracing` for diagnostics, but currently, it is just being
used as a drop-in replacement for the `log` crate. Ideally, we would
want to start emitting more structured diagnostics, using `tracing`'s
`Span`s and structured key-value fields.

A lot of the logging in `h2` is already written in a style that imitates
the formatting of structured key-value logs, but as textual log
messages. Migrating the logs to structured `tracing` events therefore is
pretty easy to do. I've also started adding spans, mostly in the read
path.

Finally, I've updated the tests to use `tracing` rather than
`env_logger`. The tracing setup happens in a macro, so that a span for
each test with the test's name can be generated and entered. This will
make the test output easier to read if multiple tests are run
concurrently with `--nocapture`.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman
2020-08-17 17:29:22 -07:00
committed by GitHub
parent d3c2bba18b
commit fc7f63f641
25 changed files with 363 additions and 257 deletions

View File

@@ -149,6 +149,7 @@ use std::task::{Context, Poll};
use std::time::Duration;
use std::usize;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tracing_futures::Instrument;
/// Initializes new HTTP/2.0 streams on a connection by sending a request.
///
@@ -1115,7 +1116,10 @@ where
T: AsyncRead + AsyncWrite + Unpin,
{
let builder = Builder::new();
builder.handshake(io).await
builder
.handshake(io)
.instrument(tracing::trace_span!("client_handshake", io = %std::any::type_name::<T>()))
.await
}
// ===== impl Connection =====
@@ -1438,6 +1442,8 @@ impl Peer {
impl proto::Peer for Peer {
type Poll = Response<()>;
const NAME: &'static str = "Client";
fn r#dyn() -> proto::DynPeer {
proto::DynPeer::Client
}

View File

@@ -61,6 +61,8 @@ impl<T> FramedRead<T> {
fn decode_frame(&mut self, mut bytes: BytesMut) -> Result<Option<Frame>, RecvError> {
use self::RecvError::*;
let span = tracing::trace_span!("FramedRead::decode_frame", offset = bytes.len());
let _e = span.enter();
tracing::trace!("decoding frame from {}B", bytes.len());
@@ -74,7 +76,7 @@ impl<T> FramedRead<T> {
let kind = head.kind();
tracing::trace!(" -> kind={:?}", kind);
tracing::trace!(frame.kind = ?kind);
macro_rules! header_block {
($frame:ident, $head:ident, $bytes:ident) => ({
@@ -338,6 +340,8 @@ where
type Item = Result<Frame, RecvError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let span = tracing::trace_span!("FramedRead::poll_next");
let _e = span.enter();
loop {
tracing::trace!("poll");
let bytes = match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
@@ -346,9 +350,9 @@ where
None => return Poll::Ready(None),
};
tracing::trace!("poll; bytes={}B", bytes.len());
tracing::trace!(read.bytes = bytes.len());
if let Some(frame) = self.decode_frame(bytes)? {
tracing::debug!("received; frame={:?}", frame);
tracing::debug!(?frame, "received");
return Poll::Ready(Some(Ok(frame)));
}
}

View File

@@ -105,8 +105,10 @@ where
pub fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
// Ensure that we have enough capacity to accept the write.
assert!(self.has_capacity());
let span = tracing::trace_span!("FramedWrite::buffer", frame = ?item);
let _e = span.enter();
tracing::debug!("send; frame={:?}", item);
tracing::debug!(frame = ?item, "send");
match item {
Frame::Data(mut v) => {
@@ -150,19 +152,19 @@ where
}
Frame::Settings(v) => {
v.encode(self.buf.get_mut());
tracing::trace!("encoded settings; rem={:?}", self.buf.remaining());
tracing::trace!(rem = self.buf.remaining(), "encoded settings");
}
Frame::GoAway(v) => {
v.encode(self.buf.get_mut());
tracing::trace!("encoded go_away; rem={:?}", self.buf.remaining());
tracing::trace!(rem = self.buf.remaining(), "encoded go_away");
}
Frame::Ping(v) => {
v.encode(self.buf.get_mut());
tracing::trace!("encoded ping; rem={:?}", self.buf.remaining());
tracing::trace!(rem = self.buf.remaining(), "encoded ping");
}
Frame::WindowUpdate(v) => {
v.encode(self.buf.get_mut());
tracing::trace!("encoded window_update; rem={:?}", self.buf.remaining());
tracing::trace!(rem = self.buf.remaining(), "encoded window_update");
}
Frame::Priority(_) => {
@@ -174,7 +176,7 @@ where
}
Frame::Reset(v) => {
v.encode(self.buf.get_mut());
tracing::trace!("encoded reset; rem={:?}", self.buf.remaining());
tracing::trace!(rem = self.buf.remaining(), "encoded reset");
}
}
@@ -183,18 +185,19 @@ where
/// Flush buffered data to the wire
pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
tracing::trace!("flush");
let span = tracing::trace_span!("FramedWrite::flush");
let _e = span.enter();
loop {
while !self.is_empty() {
match self.next {
Some(Next::Data(ref mut frame)) => {
tracing::trace!(" -> queued data frame");
tracing::trace!(queued_data_frame = true);
let mut buf = (&mut self.buf).chain(frame.payload_mut());
ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut buf))?;
}
_ => {
tracing::trace!(" -> not a queued data frame");
tracing::trace!(queued_data_frame = false);
ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut self.buf))?;
}
}

View File

@@ -183,6 +183,9 @@ impl Decoder {
self.last_max_update = size;
}
let span = tracing::trace_span!("hpack::decode");
let _e = span.enter();
tracing::trace!("decode");
while let Some(ty) = peek_u8(src) {
@@ -191,14 +194,14 @@ impl Decoder {
// determined from the first byte.
match Representation::load(ty)? {
Indexed => {
tracing::trace!(" Indexed; rem={:?}", src.remaining());
tracing::trace!(rem = src.remaining(), kind = %"Indexed");
can_resize = false;
let entry = self.decode_indexed(src)?;
consume(src);
f(entry);
}
LiteralWithIndexing => {
tracing::trace!(" LiteralWithIndexing; rem={:?}", src.remaining());
tracing::trace!(rem = src.remaining(), kind = %"LiteralWithIndexing");
can_resize = false;
let entry = self.decode_literal(src, true)?;
@@ -209,14 +212,14 @@ impl Decoder {
f(entry);
}
LiteralWithoutIndexing => {
tracing::trace!(" LiteralWithoutIndexing; rem={:?}", src.remaining());
tracing::trace!(rem = src.remaining(), kind = %"LiteralWithoutIndexing");
can_resize = false;
let entry = self.decode_literal(src, false)?;
consume(src);
f(entry);
}
LiteralNeverIndexed => {
tracing::trace!(" LiteralNeverIndexed; rem={:?}", src.remaining());
tracing::trace!(rem = src.remaining(), kind = %"LiteralNeverIndexed");
can_resize = false;
let entry = self.decode_literal(src, false)?;
consume(src);
@@ -226,7 +229,7 @@ impl Decoder {
f(entry);
}
SizeUpdate => {
tracing::trace!(" SizeUpdate; rem={:?}", src.remaining());
tracing::trace!(rem = src.remaining(), kind = %"SizeUpdate");
if !can_resize {
return Err(DecoderError::InvalidMaxDynamicSize);
}
@@ -249,9 +252,9 @@ impl Decoder {
}
tracing::debug!(
"Decoder changed max table size from {} to {}",
self.table.size(),
new_size
from = self.table.size(),
to = new_size,
"Decoder changed max table size"
);
self.table.set_max_size(new_size);
@@ -302,11 +305,7 @@ impl Decoder {
let len = decode_int(buf, 7)?;
if len > buf.remaining() {
tracing::trace!(
"decode_string underflow; len={}; remaining={}",
len,
buf.remaining()
);
tracing::trace!(len, remaining = buf.remaining(), "decode_string underflow",);
return Err(DecoderError::NeedMore(NeedMore::StringUnderflow));
}

View File

@@ -86,7 +86,11 @@ impl Encoder {
where
I: Iterator<Item = Header<Option<HeaderName>>>,
{
let span = tracing::trace_span!("hpack::encode");
let _e = span.enter();
let pos = position(dst);
tracing::trace!(pos, "encoding at");
if let Err(e) = self.encode_size_updates(dst) {
if e == EncoderError::BufferOverflow {

View File

@@ -597,7 +597,7 @@ impl Table {
}
assert!(dist <= their_dist,
"could not find entry; actual={}; desired={};" +
"could not find entry; actual={}; desired={}" +
"probe={}, dist={}; their_dist={}; index={}; msg={}",
actual, desired, probe, dist, their_dist,
index.wrapping_sub(self.inserted), msg);

View File

@@ -44,6 +44,9 @@ where
/// Stream state handler
streams: Streams<B, P>,
/// A `tracing` span tracking the lifetime of the connection.
span: tracing::Span,
/// Client or server
_phantom: PhantomData<P>,
}
@@ -100,6 +103,7 @@ where
ping_pong: PingPong::new(),
settings: Settings::new(config.settings),
streams,
span: tracing::debug_span!("Connection", peer = %P::NAME),
_phantom: PhantomData,
}
}
@@ -121,6 +125,9 @@ where
/// Returns `RecvError` as this may raise errors that are caused by delayed
/// processing of received frames.
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), RecvError>> {
let _e = self.span.enter();
let span = tracing::trace_span!("poll_ready");
let _e = span.enter();
// The order of these calls don't really matter too much
ready!(self.ping_pong.send_pending_pong(cx, &mut self.codec))?;
ready!(self.ping_pong.send_pending_ping(cx, &mut self.codec))?;
@@ -200,9 +207,18 @@ where
/// Advances the internal state of the connection.
pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), proto::Error>> {
// XXX(eliza): cloning the span is unfortunately necessary here in
// order to placate the borrow checker — `self` is mutably borrowed by
// `poll2`, which means that we can't borrow `self.span` to enter it.
// The clone is just an atomic ref bump.
let span = self.span.clone();
let _e = span.enter();
let span = tracing::trace_span!("poll");
let _e = span.enter();
use crate::codec::RecvError::*;
loop {
tracing::trace!(connection.state = ?self.state);
// TODO: probably clean up this glob of code
match self.state {
// When open, continue to poll a frame
@@ -230,7 +246,7 @@ where
// error. This is handled by setting a GOAWAY frame followed by
// terminating the connection.
Poll::Ready(Err(Connection(e))) => {
tracing::debug!("Connection::poll; connection error={:?}", e);
tracing::debug!(error = ?e, "Connection::poll; connection error");
// We may have already sent a GOAWAY for this error,
// if so, don't send another, just flush and close up.
@@ -250,7 +266,7 @@ where
// This is handled by resetting the frame then trying to read
// another frame.
Poll::Ready(Err(Stream { id, reason })) => {
tracing::trace!("stream error; id={:?}; reason={:?}", id, reason);
tracing::trace!(?id, ?reason, "stream error");
self.streams.send_reset(id, reason);
}
// Attempting to read a frame resulted in an I/O error. All
@@ -258,7 +274,7 @@ where
//
// TODO: Are I/O errors recoverable?
Poll::Ready(Err(Io(e))) => {
tracing::debug!("Connection::poll; IO error={:?}", e);
tracing::debug!(error = ?e, "Connection::poll; IO error");
let e = e.into();
// Reset all active streams
@@ -317,28 +333,28 @@ where
match ready!(Pin::new(&mut self.codec).poll_next(cx)?) {
Some(Headers(frame)) => {
tracing::trace!("recv HEADERS; frame={:?}", frame);
tracing::trace!(?frame, "recv HEADERS");
self.streams.recv_headers(frame)?;
}
Some(Data(frame)) => {
tracing::trace!("recv DATA; frame={:?}", frame);
tracing::trace!(?frame, "recv DATA");
self.streams.recv_data(frame)?;
}
Some(Reset(frame)) => {
tracing::trace!("recv RST_STREAM; frame={:?}", frame);
tracing::trace!(?frame, "recv RST_STREAM");
self.streams.recv_reset(frame)?;
}
Some(PushPromise(frame)) => {
tracing::trace!("recv PUSH_PROMISE; frame={:?}", frame);
tracing::trace!(?frame, "recv PUSH_PROMISE");
self.streams.recv_push_promise(frame)?;
}
Some(Settings(frame)) => {
tracing::trace!("recv SETTINGS; frame={:?}", frame);
tracing::trace!(?frame, "recv SETTINGS");
self.settings
.recv_settings(frame, &mut self.codec, &mut self.streams)?;
}
Some(GoAway(frame)) => {
tracing::trace!("recv GOAWAY; frame={:?}", frame);
tracing::trace!(?frame, "recv GOAWAY");
// This should prevent starting new streams,
// but should allow continuing to process current streams
// until they are all EOS. Once they are, State should
@@ -347,7 +363,7 @@ where
self.error = Some(frame.reason());
}
Some(Ping(frame)) => {
tracing::trace!("recv PING; frame={:?}", frame);
tracing::trace!(?frame, "recv PING");
let status = self.ping_pong.recv_ping(frame);
if status.is_shutdown() {
assert!(
@@ -360,11 +376,11 @@ where
}
}
Some(WindowUpdate(frame)) => {
tracing::trace!("recv WINDOW_UPDATE; frame={:?}", frame);
tracing::trace!(?frame, "recv WINDOW_UPDATE");
self.streams.recv_window_update(frame)?;
}
Some(Priority(frame)) => {
tracing::trace!("recv PRIORITY; frame={:?}", frame);
tracing::trace!(?frame, "recv PRIORITY");
// TODO: handle
}
None => {

View File

@@ -11,6 +11,7 @@ use std::fmt;
pub(crate) trait Peer {
/// Message type polled from the transport
type Poll: fmt::Debug;
const NAME: &'static str;
fn r#dyn() -> Dyn;

View File

@@ -104,6 +104,8 @@ impl Prioritize {
stream: &mut store::Ptr,
task: &mut Option<Waker>,
) {
let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
let _e = span.enter();
// Queue the frame in the buffer
stream.pending_send.push_back(buffer, frame);
self.schedule_send(stream, task);
@@ -112,7 +114,7 @@ impl Prioritize {
pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
// If the stream is waiting to be opened, nothing more to do.
if stream.is_send_ready() {
tracing::trace!("schedule_send; {:?}", stream.id);
tracing::trace!(?stream.id, "schedule_send");
// Queue the stream
self.pending_send.push(stream);
@@ -158,12 +160,10 @@ impl Prioritize {
// Update the buffered data counter
stream.buffered_send_data += sz;
tracing::trace!(
"send_data; sz={}; buffered={}; requested={}",
sz,
stream.buffered_send_data,
stream.requested_send_capacity
);
let span =
tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
let _e = span.enter();
tracing::trace!(buffered = stream.buffered_send_data);
// Implicitly request more send capacity if not enough has been
// requested yet.
@@ -180,9 +180,8 @@ impl Prioritize {
}
tracing::trace!(
"send_data (2); available={}; buffered={}",
stream.send_flow.available(),
stream.buffered_send_data
available = %stream.send_flow.available(),
buffered = stream.buffered_send_data,
);
// The `stream.buffered_send_data == 0` check is here so that, if a zero
@@ -214,13 +213,14 @@ impl Prioritize {
stream: &mut store::Ptr,
counts: &mut Counts,
) {
tracing::trace!(
"reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
stream.id,
capacity,
capacity + stream.buffered_send_data,
stream.requested_send_capacity
let span = tracing::trace_span!(
"reserve_capacity",
?stream.id,
requested = capacity,
effective = capacity + stream.buffered_send_data,
curr = stream.requested_send_capacity
);
let _e = span.enter();
// Actual capacity is `capacity` + the current amount of buffered data.
// If it were less, then we could never send out the buffered data.
@@ -266,13 +266,14 @@ impl Prioritize {
inc: WindowSize,
stream: &mut store::Ptr,
) -> Result<(), Reason> {
tracing::trace!(
"recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",
stream.id,
stream.state,
let span = tracing::trace_span!(
"recv_stream_window_update",
?stream.id,
?stream.state,
inc,
stream.send_flow
flow = ?stream.send_flow
);
let _e = span.enter();
if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
// We can't send any data, so don't bother doing anything else.
@@ -324,9 +325,11 @@ impl Prioritize {
}
pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
let span = tracing::trace_span!("clear_pending_capacity");
let _e = span.enter();
while let Some(stream) = self.pending_capacity.pop(store) {
counts.transition(stream, |_, stream| {
tracing::trace!("clear_pending_capacity; stream={:?}", stream.id);
tracing::trace!(?stream.id, "clear_pending_capacity");
})
}
}
@@ -339,7 +342,8 @@ impl Prioritize {
) where
R: Resolve,
{
tracing::trace!("assign_connection_capacity; inc={}", inc);
let span = tracing::trace_span!("assign_connection_capacity", inc);
let _e = span.enter();
self.flow.assign_capacity(inc);
@@ -382,15 +386,14 @@ impl Prioritize {
// Can't assign more than what is available
stream.send_flow.window_size() - stream.send_flow.available().as_size(),
);
let span = tracing::trace_span!("try_assign_capacity", ?stream.id);
let _e = span.enter();
tracing::trace!(
"try_assign_capacity; stream={:?}, requested={}; additional={}; buffered={}; window={}; conn={}",
stream.id,
total_requested,
requested = total_requested,
additional,
stream.buffered_send_data,
stream.send_flow.window_size(),
self.flow.available()
buffered = stream.buffered_send_data,
window = stream.send_flow.window_size(),
conn = %self.flow.available()
);
if additional == 0 {
@@ -416,7 +419,7 @@ impl Prioritize {
// TODO: Should prioritization factor into this?
let assign = cmp::min(conn_available, additional);
tracing::trace!(" assigning; stream={:?}, capacity={}", stream.id, assign,);
tracing::trace!(capacity = assign, "assigning");
// Assign the capacity to the stream
stream.assign_capacity(assign);
@@ -426,11 +429,10 @@ impl Prioritize {
}
tracing::trace!(
"try_assign_capacity(2); available={}; requested={}; buffered={}; has_unavailable={:?}",
stream.send_flow.available(),
stream.requested_send_capacity,
stream.buffered_send_data,
stream.send_flow.has_unavailable()
available = %stream.send_flow.available(),
requested = stream.requested_send_capacity,
buffered = stream.buffered_send_data,
has_unavailable = %stream.send_flow.has_unavailable()
);
if stream.send_flow.available() < stream.requested_send_capacity
@@ -492,7 +494,7 @@ impl Prioritize {
match self.pop_frame(buffer, store, max_frame_len, counts) {
Some(frame) => {
tracing::trace!("writing frame={:?}", frame);
tracing::trace!(?frame, "writing");
debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
if let Frame::Data(ref frame) = frame {
@@ -538,14 +540,15 @@ impl Prioritize {
where
B: Buf,
{
tracing::trace!("try reclaim frame");
let span = tracing::trace_span!("try_reclaim_frame");
let _e = span.enter();
// First check if there are any data chunks to take back
if let Some(frame) = dst.take_last_data_frame() {
tracing::trace!(
" -> reclaimed; frame={:?}; sz={}",
frame,
frame.payload().inner.get_ref().remaining()
?frame,
sz = frame.payload().inner.get_ref().remaining(),
"reclaimed"
);
let mut eos = false;
@@ -603,11 +606,12 @@ impl Prioritize {
}
pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
tracing::trace!("clear_queue; stream={:?}", stream.id);
let span = tracing::trace_span!("clear_queue", ?stream.id);
let _e = span.enter();
// TODO: make this more efficient?
while let Some(frame) = stream.pending_send.pop_front(buffer) {
tracing::trace!("dropping; frame={:?}", frame);
tracing::trace!(?frame, "dropping");
}
stream.buffered_send_data = 0;
@@ -644,16 +648,14 @@ impl Prioritize {
where
B: Buf,
{
tracing::trace!("pop_frame");
let span = tracing::trace_span!("pop_frame");
let _e = span.enter();
loop {
match self.pending_send.pop(store) {
Some(mut stream) => {
tracing::trace!(
"pop_frame; stream={:?}; stream.state={:?}",
stream.id,
stream.state
);
let span = tracing::trace_span!("popped", ?stream.id, ?stream.state);
let _e = span.enter();
// It's possible that this stream, besides having data to send,
// is also queued to send a reset, and thus is already in the queue
@@ -662,11 +664,7 @@ impl Prioritize {
// To be safe, we just always ask the stream.
let is_pending_reset = stream.is_pending_reset_expiration();
tracing::trace!(
" --> stream={:?}; is_pending_reset={:?};",
stream.id,
is_pending_reset
);
tracing::trace!(is_pending_reset);
let frame = match stream.pending_send.pop_front(buffer) {
Some(Frame::Data(mut frame)) => {
@@ -676,24 +674,19 @@ impl Prioritize {
let sz = frame.payload().remaining();
tracing::trace!(
" --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \
available={}; requested={}; buffered={};",
frame.stream_id(),
sz,
frame.is_end_stream(),
stream_capacity,
stream.send_flow.available(),
stream.requested_send_capacity,
stream.buffered_send_data,
eos = frame.is_end_stream(),
window = %stream_capacity,
available = %stream.send_flow.available(),
requested = stream.requested_send_capacity,
buffered = stream.buffered_send_data,
"data frame"
);
// Zero length data frames always have capacity to
// be sent.
if sz > 0 && stream_capacity == 0 {
tracing::trace!(
" --> stream capacity is 0; requested={}",
stream.requested_send_capacity
);
tracing::trace!("stream capacity is 0");
// Ensure that the stream is waiting for
// connection level capacity
@@ -721,34 +714,38 @@ impl Prioritize {
// capacity at this point.
debug_assert!(len <= self.flow.window_size());
tracing::trace!(" --> sending data frame; len={}", len);
tracing::trace!(len, "sending data frame");
// Update the flow control
tracing::trace!(" -- updating stream flow --");
stream.send_flow.send_data(len);
tracing::trace_span!("updating stream flow").in_scope(|| {
stream.send_flow.send_data(len);
// Decrement the stream's buffered data counter
debug_assert!(stream.buffered_send_data >= len);
stream.buffered_send_data -= len;
stream.requested_send_capacity -= len;
// Decrement the stream's buffered data counter
debug_assert!(stream.buffered_send_data >= len);
stream.buffered_send_data -= len;
stream.requested_send_capacity -= len;
// Assign the capacity back to the connection that
// was just consumed from the stream in the previous
// line.
self.flow.assign_capacity(len);
// Assign the capacity back to the connection that
// was just consumed from the stream in the previous
// line.
self.flow.assign_capacity(len);
});
tracing::trace!(" -- updating connection flow --");
self.flow.send_data(len);
let (eos, len) = tracing::trace_span!("updating connection flow")
.in_scope(|| {
self.flow.send_data(len);
// Wrap the frame's data payload to ensure that the
// correct amount of data gets written.
// Wrap the frame's data payload to ensure that the
// correct amount of data gets written.
let eos = frame.is_end_stream();
let len = len as usize;
let eos = frame.is_end_stream();
let len = len as usize;
if frame.payload().remaining() > len {
frame.set_end_stream(false);
}
if frame.payload().remaining() > len {
frame.set_end_stream(false);
}
(eos, len)
});
Frame::Data(frame.map(|buf| Prioritized {
inner: buf.take(len),

View File

@@ -128,6 +128,7 @@ use std::task::{Context, Poll};
use std::time::Duration;
use std::{convert, fmt, io, mem};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing_futures::{Instrument, Instrumented};
/// In progress HTTP/2.0 connection handshake future.
///
@@ -149,6 +150,8 @@ pub struct Handshake<T, B: Buf = Bytes> {
builder: Builder,
/// The current state of the handshake.
state: Handshaking<T, B>,
/// Span tracking the handshake
span: tracing::Span,
}
/// Accepts inbound HTTP/2.0 streams on a connection.
@@ -290,9 +293,9 @@ impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
/// Stages of an in-progress handshake.
enum Handshaking<T, B: Buf> {
/// State 1. Connection is flushing pending SETTINGS frame.
Flushing(Flush<T, Prioritized<B>>),
Flushing(Instrumented<Flush<T, Prioritized<B>>>),
/// State 2. Connection is waiting for the client preface.
ReadingPreface(ReadPreface<T, Prioritized<B>>),
ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
/// Dummy state for `mem::replace`.
Empty,
}
@@ -359,6 +362,9 @@ where
B: Buf + 'static,
{
fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
let span = tracing::trace_span!("server_handshake", io = %std::any::type_name::<T>());
let entered = span.enter();
// Create the codec.
let mut codec = Codec::new(io);
@@ -378,7 +384,13 @@ where
// Create the handshake future.
let state = Handshaking::from(codec);
Handshake { builder, state }
drop(entered);
Handshake {
builder,
state,
span,
}
}
/// Accept the next incoming request on this connection.
@@ -1179,7 +1191,9 @@ where
type Output = Result<Connection<T, B>, crate::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
tracing::trace!("Handshake::poll(); state={:?};", self.state);
let span = self.span.clone(); // XXX(eliza): T_T
let _e = span.enter();
tracing::trace!(state = ?self.state);
use crate::server::Handshaking::*;
self.state = if let Flushing(ref mut flush) = self.state {
@@ -1188,11 +1202,11 @@ where
// for the client preface.
let codec = match Pin::new(flush).poll(cx)? {
Poll::Pending => {
tracing::trace!("Handshake::poll(); flush.poll()=Pending");
tracing::trace!(flush.poll = %"Pending");
return Poll::Pending;
}
Poll::Ready(flushed) => {
tracing::trace!("Handshake::poll(); flush.poll()=Ready");
tracing::trace!(flush.poll = %"Ready");
flushed
}
};
@@ -1229,7 +1243,7 @@ where
},
);
tracing::trace!("Handshake::poll(); connection established!");
tracing::trace!("connection established!");
let mut c = Connection { connection };
if let Some(sz) = self.builder.initial_target_connection_window_size {
c.set_target_window_size(sz);
@@ -1290,14 +1304,14 @@ impl Peer {
use PushPromiseHeaderError::*;
match e {
NotSafeAndCacheable => tracing::debug!(
"convert_push_message: method {} is not safe and cacheable; promised_id={:?}",
?promised_id,
"convert_push_message: method {} is not safe and cacheable",
request.method(),
promised_id,
),
InvalidContentLength(e) => tracing::debug!(
"convert_push_message; promised request has invalid content-length {:?}; promised_id={:?}",
?promised_id,
"convert_push_message; promised request has invalid content-length {:?}",
e,
promised_id,
),
}
return Err(UserError::MalformedHeaders);
@@ -1328,6 +1342,8 @@ impl Peer {
impl proto::Peer for Peer {
type Poll = Request<()>;
const NAME: &'static str = "Server";
fn is_server() -> bool {
true
}
@@ -1471,7 +1487,7 @@ where
{
#[inline]
fn from(flush: Flush<T, Prioritized<B>>) -> Self {
Handshaking::Flushing(flush)
Handshaking::Flushing(flush.instrument(tracing::trace_span!("flush")))
}
}
@@ -1482,7 +1498,7 @@ where
{
#[inline]
fn from(read: ReadPreface<T, Prioritized<B>>) -> Self {
Handshaking::ReadingPreface(read)
Handshaking::ReadingPreface(read.instrument(tracing::trace_span!("read_preface")))
}
}