diff --git a/src/client/connect.rs b/src/client/connect.rs index d9c918e0..808c6b21 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -103,7 +103,7 @@ impl Service for HttpConnector { type Future = HttpConnecting; fn call(&self, uri: Uri) -> Self::Future { - debug!("Http::connect({:?})", uri); + trace!("Http::connect({:?})", uri); if self.enforce_http { if uri.scheme() != Some("http") { @@ -241,14 +241,14 @@ impl ConnectingTcp { trace!("connect error {:?}", e); err = Some(e); if let Some(addr) = self.addrs.next() { - debug!("connecting to {:?}", addr); + debug!("connecting to {}", addr); *current = TcpStream::connect(&addr, handle); continue; } } } } else if let Some(addr) = self.addrs.next() { - debug!("connecting to {:?}", addr); + debug!("connecting to {}", addr); self.current = Some(TcpStream::connect(&addr, handle)); continue; } diff --git a/src/client/dns.rs b/src/client/dns.rs index 5bf1ccc2..182481d3 100644 --- a/src/client/dns.rs +++ b/src/client/dns.rs @@ -24,7 +24,7 @@ impl Future for Work { type Error = io::Error; fn poll(&mut self) -> Poll { - debug!("resolve host={:?}, port={:?}", self.host, self.port); + debug!("resolving host={:?}, port={:?}", self.host, self.port); (&*self.host, self.port).to_socket_addrs() .map(|i| Async::Ready(IpAddrs { iter: i })) } diff --git a/src/client/pool.rs b/src/client/pool.rs index f582ccb5..f56403e3 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -85,6 +85,7 @@ impl Pool { match entry { Some(entry) => { + debug!("pooling idle connection for {:?}", key); inner.idle.entry(key) .or_insert(Vec::new()) .push(entry); @@ -95,7 +96,6 @@ impl Pool { pub fn pooled(&self, key: Rc, value: T) -> Pooled { - trace!("Pool::pooled {:?}", key); Pooled { entry: Entry { value: value, @@ -112,7 +112,7 @@ impl Pool { } fn reuse(&self, key: Rc, mut entry: Entry) -> Pooled { - trace!("Pool::reuse {:?}", key); + trace!("reuse {:?}", key); entry.is_reused = true; entry.status.set(TimedKA::Busy); Pooled { @@ -123,7 +123,7 @@ impl Pool { } fn park(&mut self, key: Rc, tx: relay::Sender>) { - trace!("Pool::park {:?}", key); + trace!("park; waiting for idle connection: {:?}", key); self.inner.borrow_mut() .parked.entry(key) .or_insert(VecDeque::new()) @@ -133,7 +133,7 @@ impl Pool { impl Pool { fn clean_parked(&mut self, key: &Rc) { - trace!("Pool::clean_parked {:?}", key); + trace!("clean_parked {:?}", key); let mut inner = self.inner.borrow_mut(); let mut remove_parked = false; @@ -285,7 +285,7 @@ impl Future for Checkout { while let Some(entry) = list.pop() { match entry.status.get() { TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => { - trace!("Checkout::poll found idle client for {:?}", key); + debug!("found idle connection for {:?}", key); should_remove = list.is_empty(); return Some(entry); }, diff --git a/src/proto/conn.rs b/src/proto/conn.rs index 31bc06a9..51ff7902 100644 --- a/src/proto/conn.rs +++ b/src/proto/conn.rs @@ -169,6 +169,9 @@ where I: AsyncRead + AsyncWrite, return Err(e); } }; + + debug!("incoming body is {}", decoder); + self.state.busy(); if head.expecting_continue() { let msg = b"HTTP/1.1 100 Continue\r\n\r\n"; @@ -203,8 +206,11 @@ where I: AsyncRead + AsyncWrite, if !slice.is_empty() { return Ok(Async::Ready(Some(super::Chunk::from(slice)))); } else if decoder.is_eof() { + debug!("incoming body completed"); (Reading::KeepAlive, Ok(Async::Ready(None))) } else { + trace!("decode stream unexpectedly ended"); + //TODO: Should this return an UnexpectedEof? (Reading::Closed, Ok(Async::Ready(None))) } @@ -240,10 +246,12 @@ where I: AsyncRead + AsyncWrite, assert!(!self.can_read_head() && !self.can_read_body()); if !self.io.read_buf().is_empty() { + debug!("received an unexpected {} bytes", self.io.read_buf().len()); Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended")) } else { match self.io.read_from_io() { Ok(Async::Ready(0)) => { + trace!("try_empty_read; found EOF on connection"); self.state.close_read(); let must_error = !self.state.is_idle() && T::should_error_on_parse_eof(); if must_error { @@ -252,11 +260,11 @@ where I: AsyncRead + AsyncWrite, Ok(()) } }, - Ok(Async::Ready(_)) => { + Ok(Async::Ready(n)) => { + debug!("received {} bytes on an idle connection", n); Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended")) }, Ok(Async::NotReady) => { - trace!("try_empty_read; read blocked"); Ok(()) }, Err(e) => { @@ -616,7 +624,7 @@ where I: AsyncRead + AsyncWrite + 'static, K: KeepAlive + 'static, T::Outgoing: fmt::Debug {} -impl, T, K: fmt::Debug> fmt::Debug for Conn { +impl, T, K: KeepAlive> fmt::Debug for Conn { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Conn") .field("state", &self.state) @@ -650,13 +658,13 @@ enum Writing { Closed, } -impl, K: fmt::Debug> fmt::Debug for State { +impl, K: KeepAlive> fmt::Debug for State { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("State") .field("reading", &self.reading) .field("writing", &self.writing) - .field("keep_alive", &self.keep_alive) - .field("method", &self.method) + .field("keep_alive", &self.keep_alive.status()) + //.field("method", &self.method) .field("read_task", &self.read_task) .finish() } diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index ea6227da..4c45bf07 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -1,3 +1,4 @@ +use std::fmt; use std::usize; use std::io; @@ -11,26 +12,12 @@ use self::Kind::{Length, Chunked, Eof}; /// /// If a message body does not include a Transfer-Encoding, it *should* /// include a Content-Length header. -#[derive(Debug, Clone, PartialEq)] +#[derive(Clone, PartialEq)] pub struct Decoder { kind: Kind, } -impl Decoder { - pub fn length(x: u64) -> Decoder { - Decoder { kind: Kind::Length(x) } - } - - pub fn chunked() -> Decoder { - Decoder { kind: Kind::Chunked(ChunkedState::Size, 0) } - } - - pub fn eof() -> Decoder { - Decoder { kind: Kind::Eof(false) } - } -} - -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq)] enum Kind { /// A Reader used when a Content-Length header is passed with a positive integer. Length(u64), @@ -38,6 +25,8 @@ enum Kind { Chunked(ChunkedState, u64), /// A Reader used for responses that don't indicate a length or chunked. /// + /// The bool tracks when EOF is seen on the transport. + /// /// Note: This should only used for `Response`s. It is illegal for a /// `Request` to be made with both `Content-Length` and /// `Transfer-Encoding: chunked` missing, as explained from the spec: @@ -53,7 +42,7 @@ enum Kind { Eof(bool), } -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Clone, Copy)] enum ChunkedState { Size, SizeLws, @@ -68,6 +57,22 @@ enum ChunkedState { } impl Decoder { + // constructors + + pub fn length(x: u64) -> Decoder { + Decoder { kind: Kind::Length(x) } + } + + pub fn chunked() -> Decoder { + Decoder { kind: Kind::Chunked(ChunkedState::Size, 0) } + } + + pub fn eof() -> Decoder { + Decoder { kind: Kind::Eof(false) } + } + + // methods + pub fn is_eof(&self) -> bool { trace!("is_eof? {:?}", self); match self.kind { @@ -77,9 +82,7 @@ impl Decoder { _ => false, } } -} -impl Decoder { pub fn decode(&mut self, body: &mut R) -> Poll { match self.kind { Length(ref mut remaining) => { @@ -131,6 +134,23 @@ impl Decoder { } } + +impl fmt::Debug for Decoder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.kind, f) + } +} + +impl fmt::Display for Decoder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.kind { + Kind::Length(n) => write!(f, "content-length ({} bytes)", n), + Kind::Chunked(..) => f.write_str("chunked encoded"), + Kind::Eof(..) => f.write_str("until end-of-file"), + } + } +} + macro_rules! byte ( ($rdr:ident) => ({ let buf = try_ready!($rdr.read_mem(1)); @@ -154,7 +174,7 @@ impl ChunkedState { Size => ChunkedState::read_size(body, size), SizeLws => ChunkedState::read_size_lws(body), Extension => ChunkedState::read_extension(body), - SizeLf => ChunkedState::read_size_lf(body, size), + SizeLf => ChunkedState::read_size_lf(body, *size), Body => ChunkedState::read_body(body, size, buf), BodyCr => ChunkedState::read_body_cr(body), BodyLf => ChunkedState::read_body_lf(body), @@ -209,11 +229,17 @@ impl ChunkedState { _ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions } } - fn read_size_lf(rdr: &mut R, size: &mut u64) -> Poll { + fn read_size_lf(rdr: &mut R, size: u64) -> Poll { trace!("Chunk size is {:?}", size); match byte!(rdr) { - b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)), - b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)), + b'\n' => { + if size == 0 { + Ok(Async::Ready(ChunkedState::EndCr)) + } else { + debug!("incoming chunked header: {0:#X} ({0} bytes)", size); + Ok(Async::Ready(ChunkedState::Body)) + } + }, _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk size LF")), } } diff --git a/src/proto/io.rs b/src/proto/io.rs index 1cc0a1e4..3ce67fdd 100644 --- a/src/proto/io.rs +++ b/src/proto/io.rs @@ -70,9 +70,9 @@ impl Buffered { pub fn parse(&mut self) -> Poll, ::Error> { loop { match try!(S::parse(&mut self.read_buf)) { - Some(head) => { - //trace!("parsed {} bytes out of {}", len, self.read_buf.len()); - return Ok(Async::Ready(head.0)) + Some((head, len)) => { + debug!("parsed {} headers ({} bytes)", head.headers.len(), len); + return Ok(Async::Ready(head)) }, None => { if self.read_buf.capacity() >= MAX_BUFFER_SIZE { @@ -118,6 +118,7 @@ impl Buffered { return Err(e) } }; + debug!("read {} bytes", n); self.read_buf.advance_mut(n); Ok(Async::Ready(n)) } @@ -154,7 +155,7 @@ impl Write for Buffered { } else { loop { let n = try!(self.write_buf.write_into(&mut self.io)); - trace!("flushed {} bytes", n); + debug!("flushed {} bytes", n); if self.write_buf.remaining() == 0 { break; }