feat(log): improve quality of debug level logs
This commit is contained in:
@@ -103,7 +103,7 @@ impl Service for HttpConnector {
|
|||||||
type Future = HttpConnecting;
|
type Future = HttpConnecting;
|
||||||
|
|
||||||
fn call(&self, uri: Uri) -> Self::Future {
|
fn call(&self, uri: Uri) -> Self::Future {
|
||||||
debug!("Http::connect({:?})", uri);
|
trace!("Http::connect({:?})", uri);
|
||||||
|
|
||||||
if self.enforce_http {
|
if self.enforce_http {
|
||||||
if uri.scheme() != Some("http") {
|
if uri.scheme() != Some("http") {
|
||||||
@@ -241,14 +241,14 @@ impl ConnectingTcp {
|
|||||||
trace!("connect error {:?}", e);
|
trace!("connect error {:?}", e);
|
||||||
err = Some(e);
|
err = Some(e);
|
||||||
if let Some(addr) = self.addrs.next() {
|
if let Some(addr) = self.addrs.next() {
|
||||||
debug!("connecting to {:?}", addr);
|
debug!("connecting to {}", addr);
|
||||||
*current = TcpStream::connect(&addr, handle);
|
*current = TcpStream::connect(&addr, handle);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if let Some(addr) = self.addrs.next() {
|
} else if let Some(addr) = self.addrs.next() {
|
||||||
debug!("connecting to {:?}", addr);
|
debug!("connecting to {}", addr);
|
||||||
self.current = Some(TcpStream::connect(&addr, handle));
|
self.current = Some(TcpStream::connect(&addr, handle));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ impl Future for Work {
|
|||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
debug!("resolve host={:?}, port={:?}", self.host, self.port);
|
debug!("resolving host={:?}, port={:?}", self.host, self.port);
|
||||||
(&*self.host, self.port).to_socket_addrs()
|
(&*self.host, self.port).to_socket_addrs()
|
||||||
.map(|i| Async::Ready(IpAddrs { iter: i }))
|
.map(|i| Async::Ready(IpAddrs { iter: i }))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -85,6 +85,7 @@ impl<T: Clone> Pool<T> {
|
|||||||
|
|
||||||
match entry {
|
match entry {
|
||||||
Some(entry) => {
|
Some(entry) => {
|
||||||
|
debug!("pooling idle connection for {:?}", key);
|
||||||
inner.idle.entry(key)
|
inner.idle.entry(key)
|
||||||
.or_insert(Vec::new())
|
.or_insert(Vec::new())
|
||||||
.push(entry);
|
.push(entry);
|
||||||
@@ -95,7 +96,6 @@ impl<T: Clone> Pool<T> {
|
|||||||
|
|
||||||
|
|
||||||
pub fn pooled(&self, key: Rc<String>, value: T) -> Pooled<T> {
|
pub fn pooled(&self, key: Rc<String>, value: T) -> Pooled<T> {
|
||||||
trace!("Pool::pooled {:?}", key);
|
|
||||||
Pooled {
|
Pooled {
|
||||||
entry: Entry {
|
entry: Entry {
|
||||||
value: value,
|
value: value,
|
||||||
@@ -112,7 +112,7 @@ impl<T: Clone> Pool<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn reuse(&self, key: Rc<String>, mut entry: Entry<T>) -> Pooled<T> {
|
fn reuse(&self, key: Rc<String>, mut entry: Entry<T>) -> Pooled<T> {
|
||||||
trace!("Pool::reuse {:?}", key);
|
trace!("reuse {:?}", key);
|
||||||
entry.is_reused = true;
|
entry.is_reused = true;
|
||||||
entry.status.set(TimedKA::Busy);
|
entry.status.set(TimedKA::Busy);
|
||||||
Pooled {
|
Pooled {
|
||||||
@@ -123,7 +123,7 @@ impl<T: Clone> Pool<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn park(&mut self, key: Rc<String>, tx: relay::Sender<Entry<T>>) {
|
fn park(&mut self, key: Rc<String>, tx: relay::Sender<Entry<T>>) {
|
||||||
trace!("Pool::park {:?}", key);
|
trace!("park; waiting for idle connection: {:?}", key);
|
||||||
self.inner.borrow_mut()
|
self.inner.borrow_mut()
|
||||||
.parked.entry(key)
|
.parked.entry(key)
|
||||||
.or_insert(VecDeque::new())
|
.or_insert(VecDeque::new())
|
||||||
@@ -133,7 +133,7 @@ impl<T: Clone> Pool<T> {
|
|||||||
|
|
||||||
impl<T> Pool<T> {
|
impl<T> Pool<T> {
|
||||||
fn clean_parked(&mut self, key: &Rc<String>) {
|
fn clean_parked(&mut self, key: &Rc<String>) {
|
||||||
trace!("Pool::clean_parked {:?}", key);
|
trace!("clean_parked {:?}", key);
|
||||||
let mut inner = self.inner.borrow_mut();
|
let mut inner = self.inner.borrow_mut();
|
||||||
|
|
||||||
let mut remove_parked = false;
|
let mut remove_parked = false;
|
||||||
@@ -285,7 +285,7 @@ impl<T: Clone> Future for Checkout<T> {
|
|||||||
while let Some(entry) = list.pop() {
|
while let Some(entry) = list.pop() {
|
||||||
match entry.status.get() {
|
match entry.status.get() {
|
||||||
TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => {
|
TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => {
|
||||||
trace!("Checkout::poll found idle client for {:?}", key);
|
debug!("found idle connection for {:?}", key);
|
||||||
should_remove = list.is_empty();
|
should_remove = list.is_empty();
|
||||||
return Some(entry);
|
return Some(entry);
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -169,6 +169,9 @@ where I: AsyncRead + AsyncWrite,
|
|||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
debug!("incoming body is {}", decoder);
|
||||||
|
|
||||||
self.state.busy();
|
self.state.busy();
|
||||||
if head.expecting_continue() {
|
if head.expecting_continue() {
|
||||||
let msg = b"HTTP/1.1 100 Continue\r\n\r\n";
|
let msg = b"HTTP/1.1 100 Continue\r\n\r\n";
|
||||||
@@ -203,8 +206,11 @@ where I: AsyncRead + AsyncWrite,
|
|||||||
if !slice.is_empty() {
|
if !slice.is_empty() {
|
||||||
return Ok(Async::Ready(Some(super::Chunk::from(slice))));
|
return Ok(Async::Ready(Some(super::Chunk::from(slice))));
|
||||||
} else if decoder.is_eof() {
|
} else if decoder.is_eof() {
|
||||||
|
debug!("incoming body completed");
|
||||||
(Reading::KeepAlive, Ok(Async::Ready(None)))
|
(Reading::KeepAlive, Ok(Async::Ready(None)))
|
||||||
} else {
|
} else {
|
||||||
|
trace!("decode stream unexpectedly ended");
|
||||||
|
//TODO: Should this return an UnexpectedEof?
|
||||||
(Reading::Closed, Ok(Async::Ready(None)))
|
(Reading::Closed, Ok(Async::Ready(None)))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -240,10 +246,12 @@ where I: AsyncRead + AsyncWrite,
|
|||||||
assert!(!self.can_read_head() && !self.can_read_body());
|
assert!(!self.can_read_head() && !self.can_read_body());
|
||||||
|
|
||||||
if !self.io.read_buf().is_empty() {
|
if !self.io.read_buf().is_empty() {
|
||||||
|
debug!("received an unexpected {} bytes", self.io.read_buf().len());
|
||||||
Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended"))
|
Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended"))
|
||||||
} else {
|
} else {
|
||||||
match self.io.read_from_io() {
|
match self.io.read_from_io() {
|
||||||
Ok(Async::Ready(0)) => {
|
Ok(Async::Ready(0)) => {
|
||||||
|
trace!("try_empty_read; found EOF on connection");
|
||||||
self.state.close_read();
|
self.state.close_read();
|
||||||
let must_error = !self.state.is_idle() && T::should_error_on_parse_eof();
|
let must_error = !self.state.is_idle() && T::should_error_on_parse_eof();
|
||||||
if must_error {
|
if must_error {
|
||||||
@@ -252,11 +260,11 @@ where I: AsyncRead + AsyncWrite,
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(_)) => {
|
Ok(Async::Ready(n)) => {
|
||||||
|
debug!("received {} bytes on an idle connection", n);
|
||||||
Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended"))
|
Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended"))
|
||||||
},
|
},
|
||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => {
|
||||||
trace!("try_empty_read; read blocked");
|
|
||||||
Ok(())
|
Ok(())
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -616,7 +624,7 @@ where I: AsyncRead + AsyncWrite + 'static,
|
|||||||
K: KeepAlive + 'static,
|
K: KeepAlive + 'static,
|
||||||
T::Outgoing: fmt::Debug {}
|
T::Outgoing: fmt::Debug {}
|
||||||
|
|
||||||
impl<I, B: AsRef<[u8]>, T, K: fmt::Debug> fmt::Debug for Conn<I, B, T, K> {
|
impl<I, B: AsRef<[u8]>, T, K: KeepAlive> fmt::Debug for Conn<I, B, T, K> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
f.debug_struct("Conn")
|
f.debug_struct("Conn")
|
||||||
.field("state", &self.state)
|
.field("state", &self.state)
|
||||||
@@ -650,13 +658,13 @@ enum Writing<B> {
|
|||||||
Closed,
|
Closed,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: AsRef<[u8]>, K: fmt::Debug> fmt::Debug for State<B, K> {
|
impl<B: AsRef<[u8]>, K: KeepAlive> fmt::Debug for State<B, K> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
f.debug_struct("State")
|
f.debug_struct("State")
|
||||||
.field("reading", &self.reading)
|
.field("reading", &self.reading)
|
||||||
.field("writing", &self.writing)
|
.field("writing", &self.writing)
|
||||||
.field("keep_alive", &self.keep_alive)
|
.field("keep_alive", &self.keep_alive.status())
|
||||||
.field("method", &self.method)
|
//.field("method", &self.method)
|
||||||
.field("read_task", &self.read_task)
|
.field("read_task", &self.read_task)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use std::fmt;
|
||||||
use std::usize;
|
use std::usize;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
@@ -11,26 +12,12 @@ use self::Kind::{Length, Chunked, Eof};
|
|||||||
///
|
///
|
||||||
/// If a message body does not include a Transfer-Encoding, it *should*
|
/// If a message body does not include a Transfer-Encoding, it *should*
|
||||||
/// include a Content-Length header.
|
/// include a Content-Length header.
|
||||||
#[derive(Debug, Clone, PartialEq)]
|
#[derive(Clone, PartialEq)]
|
||||||
pub struct Decoder {
|
pub struct Decoder {
|
||||||
kind: Kind,
|
kind: Kind,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Decoder {
|
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||||
pub fn length(x: u64) -> Decoder {
|
|
||||||
Decoder { kind: Kind::Length(x) }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn chunked() -> Decoder {
|
|
||||||
Decoder { kind: Kind::Chunked(ChunkedState::Size, 0) }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn eof() -> Decoder {
|
|
||||||
Decoder { kind: Kind::Eof(false) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq)]
|
|
||||||
enum Kind {
|
enum Kind {
|
||||||
/// A Reader used when a Content-Length header is passed with a positive integer.
|
/// A Reader used when a Content-Length header is passed with a positive integer.
|
||||||
Length(u64),
|
Length(u64),
|
||||||
@@ -38,6 +25,8 @@ enum Kind {
|
|||||||
Chunked(ChunkedState, u64),
|
Chunked(ChunkedState, u64),
|
||||||
/// A Reader used for responses that don't indicate a length or chunked.
|
/// A Reader used for responses that don't indicate a length or chunked.
|
||||||
///
|
///
|
||||||
|
/// The bool tracks when EOF is seen on the transport.
|
||||||
|
///
|
||||||
/// Note: This should only used for `Response`s. It is illegal for a
|
/// Note: This should only used for `Response`s. It is illegal for a
|
||||||
/// `Request` to be made with both `Content-Length` and
|
/// `Request` to be made with both `Content-Length` and
|
||||||
/// `Transfer-Encoding: chunked` missing, as explained from the spec:
|
/// `Transfer-Encoding: chunked` missing, as explained from the spec:
|
||||||
@@ -53,7 +42,7 @@ enum Kind {
|
|||||||
Eof(bool),
|
Eof(bool),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Clone)]
|
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||||
enum ChunkedState {
|
enum ChunkedState {
|
||||||
Size,
|
Size,
|
||||||
SizeLws,
|
SizeLws,
|
||||||
@@ -68,6 +57,22 @@ enum ChunkedState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Decoder {
|
impl Decoder {
|
||||||
|
// constructors
|
||||||
|
|
||||||
|
pub fn length(x: u64) -> Decoder {
|
||||||
|
Decoder { kind: Kind::Length(x) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn chunked() -> Decoder {
|
||||||
|
Decoder { kind: Kind::Chunked(ChunkedState::Size, 0) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn eof() -> Decoder {
|
||||||
|
Decoder { kind: Kind::Eof(false) }
|
||||||
|
}
|
||||||
|
|
||||||
|
// methods
|
||||||
|
|
||||||
pub fn is_eof(&self) -> bool {
|
pub fn is_eof(&self) -> bool {
|
||||||
trace!("is_eof? {:?}", self);
|
trace!("is_eof? {:?}", self);
|
||||||
match self.kind {
|
match self.kind {
|
||||||
@@ -77,9 +82,7 @@ impl Decoder {
|
|||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl Decoder {
|
|
||||||
pub fn decode<R: MemRead>(&mut self, body: &mut R) -> Poll<Bytes, io::Error> {
|
pub fn decode<R: MemRead>(&mut self, body: &mut R) -> Poll<Bytes, io::Error> {
|
||||||
match self.kind {
|
match self.kind {
|
||||||
Length(ref mut remaining) => {
|
Length(ref mut remaining) => {
|
||||||
@@ -131,6 +134,23 @@ impl Decoder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl fmt::Debug for Decoder {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
fmt::Debug::fmt(&self.kind, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Decoder {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
match self.kind {
|
||||||
|
Kind::Length(n) => write!(f, "content-length ({} bytes)", n),
|
||||||
|
Kind::Chunked(..) => f.write_str("chunked encoded"),
|
||||||
|
Kind::Eof(..) => f.write_str("until end-of-file"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
macro_rules! byte (
|
macro_rules! byte (
|
||||||
($rdr:ident) => ({
|
($rdr:ident) => ({
|
||||||
let buf = try_ready!($rdr.read_mem(1));
|
let buf = try_ready!($rdr.read_mem(1));
|
||||||
@@ -154,7 +174,7 @@ impl ChunkedState {
|
|||||||
Size => ChunkedState::read_size(body, size),
|
Size => ChunkedState::read_size(body, size),
|
||||||
SizeLws => ChunkedState::read_size_lws(body),
|
SizeLws => ChunkedState::read_size_lws(body),
|
||||||
Extension => ChunkedState::read_extension(body),
|
Extension => ChunkedState::read_extension(body),
|
||||||
SizeLf => ChunkedState::read_size_lf(body, size),
|
SizeLf => ChunkedState::read_size_lf(body, *size),
|
||||||
Body => ChunkedState::read_body(body, size, buf),
|
Body => ChunkedState::read_body(body, size, buf),
|
||||||
BodyCr => ChunkedState::read_body_cr(body),
|
BodyCr => ChunkedState::read_body_cr(body),
|
||||||
BodyLf => ChunkedState::read_body_lf(body),
|
BodyLf => ChunkedState::read_body_lf(body),
|
||||||
@@ -209,11 +229,17 @@ impl ChunkedState {
|
|||||||
_ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions
|
_ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn read_size_lf<R: MemRead>(rdr: &mut R, size: &mut u64) -> Poll<ChunkedState, io::Error> {
|
fn read_size_lf<R: MemRead>(rdr: &mut R, size: u64) -> Poll<ChunkedState, io::Error> {
|
||||||
trace!("Chunk size is {:?}", size);
|
trace!("Chunk size is {:?}", size);
|
||||||
match byte!(rdr) {
|
match byte!(rdr) {
|
||||||
b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)),
|
b'\n' => {
|
||||||
b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)),
|
if size == 0 {
|
||||||
|
Ok(Async::Ready(ChunkedState::EndCr))
|
||||||
|
} else {
|
||||||
|
debug!("incoming chunked header: {0:#X} ({0} bytes)", size);
|
||||||
|
Ok(Async::Ready(ChunkedState::Body))
|
||||||
|
}
|
||||||
|
},
|
||||||
_ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk size LF")),
|
_ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk size LF")),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -70,9 +70,9 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
|
|||||||
pub fn parse<S: Http1Transaction>(&mut self) -> Poll<MessageHead<S::Incoming>, ::Error> {
|
pub fn parse<S: Http1Transaction>(&mut self) -> Poll<MessageHead<S::Incoming>, ::Error> {
|
||||||
loop {
|
loop {
|
||||||
match try!(S::parse(&mut self.read_buf)) {
|
match try!(S::parse(&mut self.read_buf)) {
|
||||||
Some(head) => {
|
Some((head, len)) => {
|
||||||
//trace!("parsed {} bytes out of {}", len, self.read_buf.len());
|
debug!("parsed {} headers ({} bytes)", head.headers.len(), len);
|
||||||
return Ok(Async::Ready(head.0))
|
return Ok(Async::Ready(head))
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
if self.read_buf.capacity() >= MAX_BUFFER_SIZE {
|
if self.read_buf.capacity() >= MAX_BUFFER_SIZE {
|
||||||
@@ -118,6 +118,7 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
|
|||||||
return Err(e)
|
return Err(e)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
debug!("read {} bytes", n);
|
||||||
self.read_buf.advance_mut(n);
|
self.read_buf.advance_mut(n);
|
||||||
Ok(Async::Ready(n))
|
Ok(Async::Ready(n))
|
||||||
}
|
}
|
||||||
@@ -154,7 +155,7 @@ impl<T: Write> Write for Buffered<T> {
|
|||||||
} else {
|
} else {
|
||||||
loop {
|
loop {
|
||||||
let n = try!(self.write_buf.write_into(&mut self.io));
|
let n = try!(self.write_buf.write_into(&mut self.io));
|
||||||
trace!("flushed {} bytes", n);
|
debug!("flushed {} bytes", n);
|
||||||
if self.write_buf.remaining() == 0 {
|
if self.write_buf.remaining() == 0 {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user