From c33b9d4e16d0532a47359d969484353254820968 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 25 Jan 2018 14:40:09 -0800 Subject: [PATCH] refactor(proto): move more h1-specific pieces into h1 module --- src/proto/{ => h1}/conn.rs | 26 +++++++++++++------------- src/proto/h1/decode.rs | 5 +++-- src/proto/{ => h1}/dispatch.rs | 12 ++++++------ src/proto/h1/encode.rs | 2 +- src/proto/{ => h1}/io.rs | 4 ++-- src/proto/h1/mod.rs | 6 +++++- src/proto/h1/{parse.rs => role.rs} | 0 src/proto/mod.rs | 5 +---- 8 files changed, 31 insertions(+), 29 deletions(-) rename src/proto/{ => h1}/conn.rs (97%) rename src/proto/{ => h1}/dispatch.rs (97%) rename src/proto/{ => h1}/io.rs (98%) rename src/proto/h1/{parse.rs => role.rs} (100%) diff --git a/src/proto/conn.rs b/src/proto/h1/conn.rs similarity index 97% rename from src/proto/conn.rs rename to src/proto/h1/conn.rs index e63c4713..a5e0e450 100644 --- a/src/proto/conn.rs +++ b/src/proto/h1/conn.rs @@ -10,9 +10,9 @@ use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "tokio-proto")] use tokio_proto::streaming::pipeline::{Frame, Transport}; -use proto::Http1Transaction; +use proto::{Chunk, Http1Transaction, MessageHead}; use super::io::{Cursor, Buffered}; -use super::h1::{EncodedBuf, Encoder, Decoder}; +use super::{EncodedBuf, Encoder, Decoder}; use method::Method; use version::HttpVersion; @@ -63,7 +63,7 @@ where I: AsyncRead + AsyncWrite, } #[cfg(feature = "tokio-proto")] - fn poll_incoming(&mut self) -> Poll, super::Chunk, ::Error>>, io::Error> { + fn poll_incoming(&mut self) -> Poll, Chunk, ::Error>>, io::Error> { trace!("Conn::poll_incoming()"); #[derive(Debug)] @@ -163,7 +163,7 @@ where I: AsyncRead + AsyncWrite, T::should_error_on_parse_eof() && !self.state.is_idle() } - pub fn read_head(&mut self) -> Poll, bool)>, ::Error> { + pub fn read_head(&mut self) -> Poll, bool)>, ::Error> { debug_assert!(self.can_read_head()); trace!("Conn::read_head"); @@ -236,7 +236,7 @@ where I: AsyncRead + AsyncWrite, } } - pub fn read_body(&mut self) -> Poll, io::Error> { + pub fn read_body(&mut self) -> Poll, io::Error> { debug_assert!(self.can_read_body()); trace!("Conn::read_body"); @@ -246,7 +246,7 @@ where I: AsyncRead + AsyncWrite, match decoder.decode(&mut self.io) { Ok(Async::Ready(slice)) => { let (reading, chunk) = if !slice.is_empty() { - return Ok(Async::Ready(Some(super::Chunk::from(slice)))); + return Ok(Async::Ready(Some(Chunk::from(slice)))); } else if decoder.is_eof() { debug!("incoming body completed"); (Reading::KeepAlive, None) @@ -415,7 +415,7 @@ where I: AsyncRead + AsyncWrite, self.io.can_buffer() } - pub fn write_head(&mut self, mut head: super::MessageHead, body: bool) { + pub fn write_head(&mut self, mut head: MessageHead, body: bool) { debug_assert!(self.can_write_head()); self.enforce_version(&mut head); @@ -438,7 +438,7 @@ where I: AsyncRead + AsyncWrite, // If we know the remote speaks an older version, we try to fix up any messages // to work with our older peer. - fn enforce_version(&mut self, head: &mut super::MessageHead) { + fn enforce_version(&mut self, head: &mut MessageHead) { use header::Connection; let wants_keep_alive = if self.state.wants_keep_alive() { @@ -591,7 +591,7 @@ where I: AsyncRead + AsyncWrite, T: Http1Transaction, K: KeepAlive, T::Outgoing: fmt::Debug { - type Item = Frame, super::Chunk, ::Error>; + type Item = Frame, Chunk, ::Error>; type Error = io::Error; #[inline] @@ -610,7 +610,7 @@ where I: AsyncRead + AsyncWrite, T: Http1Transaction, K: KeepAlive, T::Outgoing: fmt::Debug { - type SinkItem = Frame, B, ::Error>; + type SinkItem = Frame, B, ::Error>; type SinkError = io::Error; #[inline] @@ -886,7 +886,7 @@ enum Version { // The DebugFrame and DebugChunk are simple Debug implementations that allow // us to dump the frame into logs, without logging the entirety of the bytes. #[cfg(feature = "tokio-proto")] -struct DebugFrame<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a>(&'a Frame, B, ::Error>); +struct DebugFrame<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a>(&'a Frame, B, ::Error>); #[cfg(feature = "tokio-proto")] impl<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a> fmt::Debug for DebugFrame<'a, T, B> { @@ -925,7 +925,7 @@ mod tests { use tokio_proto::streaming::pipeline::Frame; use proto::{self, ClientTransaction, MessageHead, ServerTransaction}; - use super::super::h1::Encoder; + use super::super::Encoder; use mock::AsyncIo; use super::{Conn, Decoder, Reading, Writing}; @@ -1118,7 +1118,7 @@ mod tests { let _: Result<(), ()> = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 0); let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default()); - let max = ::proto::io::DEFAULT_MAX_BUFFER_SIZE + 4096; + let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096; conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64)); assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready()); diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index a8fd0fb9..4521547f 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -5,7 +5,8 @@ use std::io; use futures::{Async, Poll}; use bytes::Bytes; -use proto::io::MemRead; + +use super::io::MemRead; use self::Kind::{Length, Chunked, Eof}; @@ -320,7 +321,7 @@ mod tests { use std::io::Write; use super::Decoder; use super::ChunkedState; - use proto::io::MemRead; + use super::super::io::MemRead; use futures::{Async, Poll}; use bytes::{BytesMut, Bytes}; use mock::AsyncIo; diff --git a/src/proto/dispatch.rs b/src/proto/h1/dispatch.rs similarity index 97% rename from src/proto/dispatch.rs rename to src/proto/h1/dispatch.rs index 89c3a198..27e5b237 100644 --- a/src/proto/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -5,13 +5,13 @@ use futures::sync::{mpsc, oneshot}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_service::Service; -use super::{Body, Conn, KeepAlive, Http1Transaction, MessageHead, RequestHead, ResponseHead}; +use proto::{Body, Conn, KeepAlive, Http1Transaction, MessageHead, RequestHead, ResponseHead}; use ::StatusCode; pub struct Dispatcher { conn: Conn, dispatch: D, - body_tx: Option, + body_tx: Option<::proto::body::ChunkSender>, body_rx: Option, is_closing: bool, } @@ -156,7 +156,7 @@ where match self.conn.read_head() { Ok(Async::Ready(Some((head, has_body)))) => { let body = if has_body { - let (mut tx, rx) = super::body::channel(); + let (mut tx, rx) = ::proto::body::channel(); let _ = tx.poll_ready(); // register this task if rx is dropped self.body_tx = Some(tx); Some(rx) @@ -315,7 +315,7 @@ where return Ok(Async::NotReady); } }; - let (head, body) = super::response::split(resp); + let (head, body) = ::proto::response::split(resp); Ok(Async::Ready(Some((head.into(), body)))) } else { unreachable!("poll_msg shouldn't be called if no inflight"); @@ -324,7 +324,7 @@ where fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Option)>) -> ::Result<()> { let (msg, body) = msg?; - let req = super::request::from_wire(None, msg, body); + let req = ::proto::request::from_wire(None, msg, body); self.in_flight = Some(self.service.call(req)); Ok(()) } @@ -393,7 +393,7 @@ where match msg { Ok((msg, body)) => { if let Some(cb) = self.callback.take() { - let res = super::response::from_wire(msg, body); + let res = ::proto::response::from_wire(msg, body); let _ = cb.send(Ok(res)); Ok(()) } else { diff --git a/src/proto/h1/encode.rs b/src/proto/h1/encode.rs index 4fcfd604..cc33e97b 100644 --- a/src/proto/h1/encode.rs +++ b/src/proto/h1/encode.rs @@ -261,7 +261,7 @@ impl Buf for CrLf { mod tests { use bytes::{BufMut}; - use proto::io::Cursor; + use super::super::io::Cursor; use super::Encoder; #[test] diff --git a/src/proto/io.rs b/src/proto/h1/io.rs similarity index 98% rename from src/proto/io.rs rename to src/proto/h1/io.rs index 53ff6301..3483d4f8 100644 --- a/src/proto/io.rs +++ b/src/proto/h1/io.rs @@ -7,7 +7,7 @@ use futures::{Async, Poll}; use iovec::IoVec; use tokio_io::{AsyncRead, AsyncWrite}; -use super::{Http1Transaction, MessageHead}; +use proto::{Http1Transaction, MessageHead}; const INIT_BUFFER_SIZE: usize = 8192; pub const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; @@ -505,6 +505,6 @@ fn test_parse_reads_until_blocked() { let mock = AsyncIo::new(MockBuf::wrap(raw.into()), raw.len()); let mut buffered = Buffered::<_, Cursor>>::new(mock); - assert_eq!(buffered.parse::().unwrap(), Async::NotReady); + assert_eq!(buffered.parse::<::proto::ClientTransaction>().unwrap(), Async::NotReady); assert!(buffered.io.blocked()); } diff --git a/src/proto/h1/mod.rs b/src/proto/h1/mod.rs index dfd62906..04e34c58 100644 --- a/src/proto/h1/mod.rs +++ b/src/proto/h1/mod.rs @@ -1,8 +1,12 @@ +pub use self::conn::{Conn, KeepAlive, KA}; pub use self::decode::Decoder; pub use self::encode::{EncodedBuf, Encoder}; +mod conn; mod date; mod decode; +pub mod dispatch; mod encode; -pub mod parse; +mod io; +pub mod role; diff --git a/src/proto/h1/parse.rs b/src/proto/h1/role.rs similarity index 100% rename from src/proto/h1/parse.rs rename to src/proto/h1/role.rs diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 54576b44..24f077af 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -12,17 +12,14 @@ use uri::Uri; use version::HttpVersion; use version::HttpVersion::{Http10, Http11}; -pub use self::conn::{Conn, KeepAlive, KA}; pub use self::body::Body; #[cfg(feature = "tokio-proto")] pub use self::body::TokioBody; pub use self::chunk::Chunk; +pub use self::h1::{dispatch, Conn, KeepAlive, KA}; mod body; mod chunk; -mod conn; -pub mod dispatch; -mod io; mod h1; //mod h2; pub mod request;