Merge pull request #1362 from hyperium/unproto
feat(lib): add support to disable tokio-proto internals
This commit is contained in:
		| @@ -8,6 +8,8 @@ matrix: | |||||||
|           env: FEATURES="--features nightly" |           env: FEATURES="--features nightly" | ||||||
|         - rust: beta |         - rust: beta | ||||||
|         - rust: stable |         - rust: stable | ||||||
|  |         - rust: stable | ||||||
|  |           env: HYPER_NO_PROTO=1 | ||||||
|         - rust: stable |         - rust: stable | ||||||
|           env: FEATURES="--features compat" |           env: FEATURES="--features compat" | ||||||
|         - rust: 1.17.0 |         - rust: 1.17.0 | ||||||
|   | |||||||
| @@ -1,4 +1,4 @@ | |||||||
| #![deny(warnings)] | //#![deny(warnings)] | ||||||
| extern crate futures; | extern crate futures; | ||||||
| extern crate hyper; | extern crate hyper; | ||||||
| extern crate tokio_core; | extern crate tokio_core; | ||||||
| @@ -32,7 +32,9 @@ fn main() { | |||||||
|  |  | ||||||
|     let mut core = tokio_core::reactor::Core::new().unwrap(); |     let mut core = tokio_core::reactor::Core::new().unwrap(); | ||||||
|     let handle = core.handle(); |     let handle = core.handle(); | ||||||
|     let client = Client::new(&handle); |     let client = Client::configure() | ||||||
|  |         .no_proto() | ||||||
|  |         .build(&handle); | ||||||
|  |  | ||||||
|     let work = client.get(url).and_then(|res| { |     let work = client.get(url).and_then(|res| { | ||||||
|         println!("Response: {}", res.status()); |         println!("Response: {}", res.status()); | ||||||
|   | |||||||
| @@ -31,7 +31,8 @@ impl Service for Hello { | |||||||
| fn main() { | fn main() { | ||||||
|     pretty_env_logger::init().unwrap(); |     pretty_env_logger::init().unwrap(); | ||||||
|     let addr = "127.0.0.1:3000".parse().unwrap(); |     let addr = "127.0.0.1:3000".parse().unwrap(); | ||||||
|     let server = Http::new().bind(&addr, || Ok(Hello)).unwrap(); |     let mut server = Http::new().bind(&addr, || Ok(Hello)).unwrap(); | ||||||
|  |     server.no_proto(); | ||||||
|     println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); |     println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); | ||||||
|     server.run().unwrap(); |     server.run().unwrap(); | ||||||
| } | } | ||||||
|   | |||||||
| @@ -47,7 +47,8 @@ fn main() { | |||||||
|     pretty_env_logger::init().unwrap(); |     pretty_env_logger::init().unwrap(); | ||||||
|     let addr = "127.0.0.1:1337".parse().unwrap(); |     let addr = "127.0.0.1:1337".parse().unwrap(); | ||||||
|  |  | ||||||
|     let server = Http::new().bind(&addr, || Ok(Echo)).unwrap(); |     let mut server = Http::new().bind(&addr, || Ok(Echo)).unwrap(); | ||||||
|  |     server.no_proto(); | ||||||
|     println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); |     println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); | ||||||
|     server.run().unwrap(); |     server.run().unwrap(); | ||||||
| } | } | ||||||
|   | |||||||
| @@ -20,7 +20,7 @@ use tokio_proto::util::client_proxy::ClientProxy; | |||||||
| pub use tokio_service::Service; | pub use tokio_service::Service; | ||||||
|  |  | ||||||
| use header::{Headers, Host}; | use header::{Headers, Host}; | ||||||
| use proto::{self, TokioBody}; | use proto::{self, RequestHead, TokioBody}; | ||||||
| use proto::response; | use proto::response; | ||||||
| use proto::request; | use proto::request; | ||||||
| use method::Method; | use method::Method; | ||||||
| @@ -45,7 +45,7 @@ pub mod compat; | |||||||
| pub struct Client<C, B = proto::Body> { | pub struct Client<C, B = proto::Body> { | ||||||
|     connector: C, |     connector: C, | ||||||
|     handle: Handle, |     handle: Handle, | ||||||
|     pool: Pool<TokioClient<B>>, |     pool: Dispatch<B>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Client<HttpConnector, proto::Body> { | impl Client<HttpConnector, proto::Body> { | ||||||
| @@ -93,7 +93,11 @@ impl<C, B> Client<C, B> { | |||||||
|         Client { |         Client { | ||||||
|             connector: config.connector, |             connector: config.connector, | ||||||
|             handle: handle.clone(), |             handle: handle.clone(), | ||||||
|             pool: Pool::new(config.keep_alive, config.keep_alive_timeout), |             pool: if config.no_proto { | ||||||
|  |                 Dispatch::Hyper(Pool::new(config.keep_alive, config.keep_alive_timeout)) | ||||||
|  |             } else { | ||||||
|  |                 Dispatch::Proto(Pool::new(config.keep_alive, config.keep_alive_timeout)) | ||||||
|  |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -187,10 +191,13 @@ where C: Connect, | |||||||
|         headers.extend(head.headers.iter()); |         headers.extend(head.headers.iter()); | ||||||
|         head.headers = headers; |         head.headers = headers; | ||||||
|  |  | ||||||
|         let checkout = self.pool.checkout(domain.as_ref()); |         match self.pool { | ||||||
|  |             Dispatch::Proto(ref pool) => { | ||||||
|  |                 trace!("proto_dispatch"); | ||||||
|  |                 let checkout = pool.checkout(domain.as_ref()); | ||||||
|                 let connect = { |                 let connect = { | ||||||
|                     let handle = self.handle.clone(); |                     let handle = self.handle.clone(); | ||||||
|             let pool = self.pool.clone(); |                     let pool = pool.clone(); | ||||||
|                     let pool_key = Rc::new(domain.to_string()); |                     let pool_key = Rc::new(domain.to_string()); | ||||||
|                     self.connector.connect(url) |                     self.connector.connect(url) | ||||||
|                         .map(move |io| { |                         .map(move |io| { | ||||||
| @@ -229,6 +236,55 @@ where C: Connect, | |||||||
|                         Message::WithBody(head, body) => response::from_wire(head, Some(body.into())), |                         Message::WithBody(head, body) => response::from_wire(head, Some(body.into())), | ||||||
|                     } |                     } | ||||||
|                 }))) |                 }))) | ||||||
|  |             }, | ||||||
|  |             Dispatch::Hyper(ref pool) => { | ||||||
|  |                 trace!("no_proto dispatch"); | ||||||
|  |                 use futures::Sink; | ||||||
|  |                 use futures::sync::{mpsc, oneshot}; | ||||||
|  |  | ||||||
|  |                 let checkout = pool.checkout(domain.as_ref()); | ||||||
|  |                 let connect = { | ||||||
|  |                     let handle = self.handle.clone(); | ||||||
|  |                     let pool = pool.clone(); | ||||||
|  |                     let pool_key = Rc::new(domain.to_string()); | ||||||
|  |                     self.connector.connect(url) | ||||||
|  |                         .map(move |io| { | ||||||
|  |                             let (tx, rx) = mpsc::channel(1); | ||||||
|  |                             let pooled = pool.pooled(pool_key, RefCell::new(tx)); | ||||||
|  |                             let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone()); | ||||||
|  |                             let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn); | ||||||
|  |                             handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err))); | ||||||
|  |                             pooled | ||||||
|  |                         }) | ||||||
|  |                 }; | ||||||
|  |  | ||||||
|  |                 let race = checkout.select(connect) | ||||||
|  |                     .map(|(client, _work)| client) | ||||||
|  |                     .map_err(|(e, _work)| { | ||||||
|  |                         // the Pool Checkout cannot error, so the only error | ||||||
|  |                         // is from the Connector | ||||||
|  |                         // XXX: should wait on the Checkout? Problem is | ||||||
|  |                         // that if the connector is failing, it may be that we | ||||||
|  |                         // never had a pooled stream at all | ||||||
|  |                         e.into() | ||||||
|  |                     }); | ||||||
|  |  | ||||||
|  |                 let resp = race.and_then(move |client| { | ||||||
|  |                     let (callback, rx) = oneshot::channel(); | ||||||
|  |                     client.borrow_mut().start_send((head, body, callback)).unwrap(); | ||||||
|  |                     rx.then(|res| { | ||||||
|  |                         match res { | ||||||
|  |                             Ok(Ok(res)) => Ok(res), | ||||||
|  |                             Ok(Err(err)) => Err(err), | ||||||
|  |                             Err(_) => panic!("dispatch dropped without returning error"), | ||||||
|  |                         } | ||||||
|  |                     }) | ||||||
|  |                 }); | ||||||
|  |  | ||||||
|  |                 FutureResponse(Box::new(resp)) | ||||||
|  |  | ||||||
|  |             } | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
| } | } | ||||||
| @@ -238,7 +294,10 @@ impl<C: Clone, B> Clone for Client<C, B> { | |||||||
|         Client { |         Client { | ||||||
|             connector: self.connector.clone(), |             connector: self.connector.clone(), | ||||||
|             handle: self.handle.clone(), |             handle: self.handle.clone(), | ||||||
|             pool: self.pool.clone(), |             pool: match self.pool { | ||||||
|  |                 Dispatch::Proto(ref pool) => Dispatch::Proto(pool.clone()), | ||||||
|  |                 Dispatch::Hyper(ref pool) => Dispatch::Hyper(pool.clone()), | ||||||
|  |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -249,10 +308,16 @@ impl<C, B> fmt::Debug for Client<C, B> { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| type TokioClient<B> = ClientProxy<Message<proto::RequestHead, B>, Message<proto::ResponseHead, TokioBody>, ::Error>; | type ProtoClient<B> = ClientProxy<Message<RequestHead, B>, Message<proto::ResponseHead, TokioBody>, ::Error>; | ||||||
|  | type HyperClient<B> = RefCell<::futures::sync::mpsc::Sender<(RequestHead, Option<B>, ::futures::sync::oneshot::Sender<::Result<::Response>>)>>; | ||||||
|  |  | ||||||
|  | enum Dispatch<B> { | ||||||
|  |     Proto(Pool<ProtoClient<B>>), | ||||||
|  |     Hyper(Pool<HyperClient<B>>), | ||||||
|  | } | ||||||
|  |  | ||||||
| struct HttpClient<B> { | struct HttpClient<B> { | ||||||
|     client_rx: RefCell<Option<oneshot::Receiver<Pooled<TokioClient<B>>>>>, |     client_rx: RefCell<Option<oneshot::Receiver<Pooled<ProtoClient<B>>>>>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<T, B> ClientProto<T> for HttpClient<B> | impl<T, B> ClientProto<T> for HttpClient<B> | ||||||
| @@ -265,7 +330,7 @@ where T: AsyncRead + AsyncWrite + 'static, | |||||||
|     type Response = proto::ResponseHead; |     type Response = proto::ResponseHead; | ||||||
|     type ResponseBody = proto::Chunk; |     type ResponseBody = proto::Chunk; | ||||||
|     type Error = ::Error; |     type Error = ::Error; | ||||||
|     type Transport = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<TokioClient<B>>>; |     type Transport = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<ProtoClient<B>>>; | ||||||
|     type BindTransport = BindingClient<T, B>; |     type BindTransport = BindingClient<T, B>; | ||||||
|  |  | ||||||
|     fn bind_transport(&self, io: T) -> Self::BindTransport { |     fn bind_transport(&self, io: T) -> Self::BindTransport { | ||||||
| @@ -277,7 +342,7 @@ where T: AsyncRead + AsyncWrite + 'static, | |||||||
| } | } | ||||||
|  |  | ||||||
| struct BindingClient<T, B> { | struct BindingClient<T, B> { | ||||||
|     rx: oneshot::Receiver<Pooled<TokioClient<B>>>, |     rx: oneshot::Receiver<Pooled<ProtoClient<B>>>, | ||||||
|     io: Option<T>, |     io: Option<T>, | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -286,7 +351,7 @@ where T: AsyncRead + AsyncWrite + 'static, | |||||||
|       B: Stream<Error=::Error>, |       B: Stream<Error=::Error>, | ||||||
|       B::Item: AsRef<[u8]>, |       B::Item: AsRef<[u8]>, | ||||||
| { | { | ||||||
|     type Item = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<TokioClient<B>>>; |     type Item = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<ProtoClient<B>>>; | ||||||
|     type Error = io::Error; |     type Error = io::Error; | ||||||
|  |  | ||||||
|     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { |     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||||
| @@ -309,6 +374,7 @@ pub struct Config<C, B> { | |||||||
|     keep_alive_timeout: Option<Duration>, |     keep_alive_timeout: Option<Duration>, | ||||||
|     //TODO: make use of max_idle config |     //TODO: make use of max_idle config | ||||||
|     max_idle: usize, |     max_idle: usize, | ||||||
|  |     no_proto: bool, | ||||||
| } | } | ||||||
|  |  | ||||||
| /// Phantom type used to signal that `Config` should create a `HttpConnector`. | /// Phantom type used to signal that `Config` should create a `HttpConnector`. | ||||||
| @@ -324,6 +390,7 @@ impl Default for Config<UseDefaultConnector, proto::Body> { | |||||||
|             keep_alive: true, |             keep_alive: true, | ||||||
|             keep_alive_timeout: Some(Duration::from_secs(90)), |             keep_alive_timeout: Some(Duration::from_secs(90)), | ||||||
|             max_idle: 5, |             max_idle: 5, | ||||||
|  |             no_proto: false, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -347,6 +414,7 @@ impl<C, B> Config<C, B> { | |||||||
|             keep_alive: self.keep_alive, |             keep_alive: self.keep_alive, | ||||||
|             keep_alive_timeout: self.keep_alive_timeout, |             keep_alive_timeout: self.keep_alive_timeout, | ||||||
|             max_idle: self.max_idle, |             max_idle: self.max_idle, | ||||||
|  |             no_proto: self.no_proto, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -360,6 +428,7 @@ impl<C, B> Config<C, B> { | |||||||
|             keep_alive: self.keep_alive, |             keep_alive: self.keep_alive, | ||||||
|             keep_alive_timeout: self.keep_alive_timeout, |             keep_alive_timeout: self.keep_alive_timeout, | ||||||
|             max_idle: self.max_idle, |             max_idle: self.max_idle, | ||||||
|  |             no_proto: self.no_proto, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -393,6 +462,13 @@ impl<C, B> Config<C, B> { | |||||||
|         self |         self | ||||||
|     } |     } | ||||||
|     */ |     */ | ||||||
|  |  | ||||||
|  |     /// Disable tokio-proto internal usage. | ||||||
|  |     #[inline] | ||||||
|  |     pub fn no_proto(mut self) -> Config<C, B> { | ||||||
|  |         self.no_proto = true; | ||||||
|  |         self | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<C, B> Config<C, B> | impl<C, B> Config<C, B> | ||||||
| @@ -431,11 +507,8 @@ impl<C, B> fmt::Debug for Config<C, B> { | |||||||
| impl<C: Clone, B> Clone for Config<C, B> { | impl<C: Clone, B> Clone for Config<C, B> { | ||||||
|     fn clone(&self) -> Config<C, B> { |     fn clone(&self) -> Config<C, B> { | ||||||
|         Config { |         Config { | ||||||
|             _body_type: PhantomData::<B>, |  | ||||||
|             connector: self.connector.clone(), |             connector: self.connector.clone(), | ||||||
|             keep_alive: self.keep_alive, |             .. *self | ||||||
|             keep_alive_timeout: self.keep_alive_timeout, |  | ||||||
|             max_idle: self.max_idle, |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -7,6 +7,7 @@ use std::borrow::Cow; | |||||||
| use super::Chunk; | use super::Chunk; | ||||||
|  |  | ||||||
| pub type TokioBody = tokio_proto::streaming::Body<Chunk, ::Error>; | pub type TokioBody = tokio_proto::streaming::Body<Chunk, ::Error>; | ||||||
|  | pub type BodySender = mpsc::Sender<Result<Chunk, ::Error>>; | ||||||
|  |  | ||||||
| /// A `Stream` for `Chunk`s used in requests and responses. | /// A `Stream` for `Chunk`s used in requests and responses. | ||||||
| #[must_use = "streams do nothing unless polled"] | #[must_use = "streams do nothing unless polled"] | ||||||
|   | |||||||
| @@ -7,7 +7,7 @@ use futures::task::Task; | |||||||
| use tokio_io::{AsyncRead, AsyncWrite}; | use tokio_io::{AsyncRead, AsyncWrite}; | ||||||
| use tokio_proto::streaming::pipeline::{Frame, Transport}; | use tokio_proto::streaming::pipeline::{Frame, Transport}; | ||||||
|  |  | ||||||
| use proto::{Http1Transaction}; | use proto::Http1Transaction; | ||||||
| use super::io::{Cursor, Buffered}; | use super::io::{Cursor, Buffered}; | ||||||
| use super::h1::{Encoder, Decoder}; | use super::h1::{Encoder, Decoder}; | ||||||
| use method::Method; | use method::Method; | ||||||
| @@ -51,15 +51,28 @@ where I: AsyncRead + AsyncWrite, | |||||||
|         self.io.set_flush_pipeline(enabled); |         self.io.set_flush_pipeline(enabled); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn poll2(&mut self) -> Poll<Option<Frame<super::MessageHead<T::Incoming>, super::Chunk, ::Error>>, io::Error> { |     fn poll_incoming(&mut self) -> Poll<Option<Frame<super::MessageHead<T::Incoming>, super::Chunk, ::Error>>, io::Error> { | ||||||
|         trace!("Conn::poll()"); |         trace!("Conn::poll_incoming()"); | ||||||
|  |  | ||||||
|         loop { |         loop { | ||||||
|             if self.is_read_closed() { |             if self.is_read_closed() { | ||||||
|                 trace!("Conn::poll when closed"); |                 trace!("Conn::poll when closed"); | ||||||
|                 return Ok(Async::Ready(None)); |                 return Ok(Async::Ready(None)); | ||||||
|             } else if self.can_read_head() { |             } else if self.can_read_head() { | ||||||
|                 return self.read_head(); |                 return match self.read_head() { | ||||||
|  |                     Ok(Async::Ready(Some((head, body)))) => { | ||||||
|  |                         Ok(Async::Ready(Some(Frame::Message { | ||||||
|  |                             message: head, | ||||||
|  |                             body: body, | ||||||
|  |                         }))) | ||||||
|  |                     }, | ||||||
|  |                     Ok(Async::Ready(None)) => Ok(Async::Ready(None)), | ||||||
|  |                     Ok(Async::NotReady) => Ok(Async::NotReady), | ||||||
|  |                     Err(::Error::Io(err)) => Err(err), | ||||||
|  |                     Err(err) => Ok(Async::Ready(Some(Frame::Error { | ||||||
|  |                         error: err, | ||||||
|  |                     }))), | ||||||
|  |                 }; | ||||||
|             } else if self.can_write_continue() { |             } else if self.can_write_continue() { | ||||||
|                 try_nb!(self.flush()); |                 try_nb!(self.flush()); | ||||||
|             } else if self.can_read_body() { |             } else if self.can_read_body() { | ||||||
| @@ -79,18 +92,26 @@ where I: AsyncRead + AsyncWrite, | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn is_read_closed(&self) -> bool { |     pub fn is_read_closed(&self) -> bool { | ||||||
|         self.state.is_read_closed() |         self.state.is_read_closed() | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[allow(unused)] |     pub fn is_write_closed(&self) -> bool { | ||||||
|     fn is_write_closed(&self) -> bool { |  | ||||||
|         self.state.is_write_closed() |         self.state.is_write_closed() | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn can_read_head(&self) -> bool { |     pub fn can_read_head(&self) -> bool { | ||||||
|         match self.state.reading { |         match self.state.reading { | ||||||
|             Reading::Init => true, |             Reading::Init => { | ||||||
|  |                 if T::should_read_first() { | ||||||
|  |                     true | ||||||
|  |                 } else { | ||||||
|  |                     match self.state.writing { | ||||||
|  |                         Writing::Init => false, | ||||||
|  |                         _ => true, | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|             _ => false, |             _ => false, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| @@ -102,14 +123,14 @@ where I: AsyncRead + AsyncWrite, | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn can_read_body(&self) -> bool { |     pub fn can_read_body(&self) -> bool { | ||||||
|         match self.state.reading { |         match self.state.reading { | ||||||
|             Reading::Body(..) => true, |             Reading::Body(..) => true, | ||||||
|             _ => false, |             _ => false, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn read_head(&mut self) -> Poll<Option<Frame<super::MessageHead<T::Incoming>, super::Chunk, ::Error>>, io::Error> { |     pub fn read_head(&mut self) -> Poll<Option<(super::MessageHead<T::Incoming>, bool)>, ::Error> { | ||||||
|         debug_assert!(self.can_read_head()); |         debug_assert!(self.can_read_head()); | ||||||
|         trace!("Conn::read_head"); |         trace!("Conn::read_head"); | ||||||
|  |  | ||||||
| @@ -117,13 +138,16 @@ where I: AsyncRead + AsyncWrite, | |||||||
|             Ok(Async::Ready(head)) => (head.version, head), |             Ok(Async::Ready(head)) => (head.version, head), | ||||||
|             Ok(Async::NotReady) => return Ok(Async::NotReady), |             Ok(Async::NotReady) => return Ok(Async::NotReady), | ||||||
|             Err(e) => { |             Err(e) => { | ||||||
|                 let must_respond_with_error = !self.state.is_idle(); |                 // If we are currently waiting on a message, then an empty | ||||||
|  |                 // message should be reported as an error. If not, it is just | ||||||
|  |                 // the connection closing gracefully. | ||||||
|  |                 let must_error = !self.state.is_idle() && T::should_error_on_parse_eof(); | ||||||
|                 self.state.close_read(); |                 self.state.close_read(); | ||||||
|                 self.io.consume_leading_lines(); |                 self.io.consume_leading_lines(); | ||||||
|                 let was_mid_parse = !self.io.read_buf().is_empty(); |                 let was_mid_parse = !self.io.read_buf().is_empty(); | ||||||
|                 return if was_mid_parse || must_respond_with_error { |                 return if was_mid_parse || must_error { | ||||||
|                     debug!("parse error ({}) with {} bytes", e, self.io.read_buf().len()); |                     debug!("parse error ({}) with {} bytes", e, self.io.read_buf().len()); | ||||||
|                     Ok(Async::Ready(Some(Frame::Error { error: e }))) |                     Err(e) | ||||||
|                 } else { |                 } else { | ||||||
|                     debug!("read eof"); |                     debug!("read eof"); | ||||||
|                     Ok(Async::Ready(None)) |                     Ok(Async::Ready(None)) | ||||||
| @@ -138,7 +162,7 @@ where I: AsyncRead + AsyncWrite, | |||||||
|                     Err(e) => { |                     Err(e) => { | ||||||
|                         debug!("decoder error = {:?}", e); |                         debug!("decoder error = {:?}", e); | ||||||
|                         self.state.close_read(); |                         self.state.close_read(); | ||||||
|                         return Ok(Async::Ready(Some(Frame::Error { error: e }))); |                         return Err(e); | ||||||
|                     } |                     } | ||||||
|                 }; |                 }; | ||||||
|                 self.state.busy(); |                 self.state.busy(); | ||||||
| @@ -154,17 +178,17 @@ where I: AsyncRead + AsyncWrite, | |||||||
|                     (true, Reading::Body(decoder)) |                     (true, Reading::Body(decoder)) | ||||||
|                 }; |                 }; | ||||||
|                 self.state.reading = reading; |                 self.state.reading = reading; | ||||||
|                 Ok(Async::Ready(Some(Frame::Message { message: head, body: body }))) |                 Ok(Async::Ready(Some((head, body)))) | ||||||
|             }, |             }, | ||||||
|             _ => { |             _ => { | ||||||
|                 error!("unimplemented HTTP Version = {:?}", version); |                 error!("unimplemented HTTP Version = {:?}", version); | ||||||
|                 self.state.close_read(); |                 self.state.close_read(); | ||||||
|                 Ok(Async::Ready(Some(Frame::Error { error: ::Error::Version }))) |                 Err(::Error::Version) | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn read_body(&mut self) -> Poll<Option<super::Chunk>, io::Error> { |     pub fn read_body(&mut self) -> Poll<Option<super::Chunk>, io::Error> { | ||||||
|         debug_assert!(self.can_read_body()); |         debug_assert!(self.can_read_body()); | ||||||
|  |  | ||||||
|         trace!("Conn::read_body"); |         trace!("Conn::read_body"); | ||||||
| @@ -187,7 +211,7 @@ where I: AsyncRead + AsyncWrite, | |||||||
|         ret |         ret | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn maybe_park_read(&mut self) { |     pub fn maybe_park_read(&mut self) { | ||||||
|         if !self.io.is_read_blocked() { |         if !self.io.is_read_blocked() { | ||||||
|             // the Io object is ready to read, which means it will never alert |             // the Io object is ready to read, which means it will never alert | ||||||
|             // us that it is ready until we drain it. However, we're currently |             // us that it is ready until we drain it. However, we're currently | ||||||
| @@ -236,13 +260,16 @@ where I: AsyncRead + AsyncWrite, | |||||||
|                         return |                         return | ||||||
|                     }, |                     }, | ||||||
|                     Err(e) => { |                     Err(e) => { | ||||||
|                         trace!("maybe_notify read_from_io error: {}", e); |                         trace!("maybe_notify; read_from_io error: {}", e); | ||||||
|                         self.state.close(); |                         self.state.close(); | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|             if let Some(ref task) = self.state.read_task { |             if let Some(ref task) = self.state.read_task { | ||||||
|  |                 trace!("maybe_notify; notifying task"); | ||||||
|                 task.notify(); |                 task.notify(); | ||||||
|  |             } else { | ||||||
|  |                 trace!("maybe_notify; no task to notify"); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| @@ -252,14 +279,14 @@ where I: AsyncRead + AsyncWrite, | |||||||
|         self.maybe_notify(); |         self.maybe_notify(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn can_write_head(&self) -> bool { |     pub fn can_write_head(&self) -> bool { | ||||||
|         match self.state.writing { |         match self.state.writing { | ||||||
|             Writing::Continue(..) | Writing::Init => true, |             Writing::Continue(..) | Writing::Init => true, | ||||||
|             _ => false |             _ => false | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn can_write_body(&self) -> bool { |     pub fn can_write_body(&self) -> bool { | ||||||
|         match self.state.writing { |         match self.state.writing { | ||||||
|             Writing::Body(..) => true, |             Writing::Body(..) => true, | ||||||
|             Writing::Continue(..) | |             Writing::Continue(..) | | ||||||
| @@ -277,7 +304,7 @@ where I: AsyncRead + AsyncWrite, | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn write_head(&mut self, head: super::MessageHead<T::Outgoing>, body: bool) { |     pub fn write_head(&mut self, head: super::MessageHead<T::Outgoing>, body: bool) { | ||||||
|         debug_assert!(self.can_write_head()); |         debug_assert!(self.can_write_head()); | ||||||
|  |  | ||||||
|         let wants_keep_alive = head.should_keep_alive(); |         let wants_keep_alive = head.should_keep_alive(); | ||||||
| @@ -298,7 +325,7 @@ where I: AsyncRead + AsyncWrite, | |||||||
|         }; |         }; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn write_body(&mut self, chunk: Option<B>) -> StartSend<Option<B>, io::Error> { |     pub fn write_body(&mut self, chunk: Option<B>) -> StartSend<Option<B>, io::Error> { | ||||||
|         debug_assert!(self.can_write_body()); |         debug_assert!(self.can_write_body()); | ||||||
|  |  | ||||||
|         if self.has_queued_body() { |         if self.has_queued_body() { | ||||||
| @@ -397,7 +424,7 @@ where I: AsyncRead + AsyncWrite, | |||||||
|         Ok(Async::Ready(())) |         Ok(Async::Ready(())) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn flush(&mut self) -> Poll<(), io::Error> { |     pub fn flush(&mut self) -> Poll<(), io::Error> { | ||||||
|         loop { |         loop { | ||||||
|             let queue_finished = try!(self.write_queued()).is_ready(); |             let queue_finished = try!(self.write_queued()).is_ready(); | ||||||
|             try_nb!(self.io.flush()); |             try_nb!(self.io.flush()); | ||||||
| @@ -410,8 +437,18 @@ where I: AsyncRead + AsyncWrite, | |||||||
|         Ok(Async::Ready(())) |         Ok(Async::Ready(())) | ||||||
|  |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn close_read(&mut self) { | ||||||
|  |         self.state.close_read(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn close_write(&mut self) { | ||||||
|  |         self.state.close_write(); | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // ==== tokio_proto impl ==== | ||||||
|  |  | ||||||
| impl<I, B, T, K> Stream for Conn<I, B, T, K> | impl<I, B, T, K> Stream for Conn<I, B, T, K> | ||||||
| where I: AsyncRead + AsyncWrite, | where I: AsyncRead + AsyncWrite, | ||||||
|       B: AsRef<[u8]>, |       B: AsRef<[u8]>, | ||||||
| @@ -423,7 +460,7 @@ where I: AsyncRead + AsyncWrite, | |||||||
|  |  | ||||||
|     #[inline] |     #[inline] | ||||||
|     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { |     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | ||||||
|         self.poll2().map_err(|err| { |         self.poll_incoming().map_err(|err| { | ||||||
|             debug!("poll error: {}", err); |             debug!("poll error: {}", err); | ||||||
|             err |             err | ||||||
|         }) |         }) | ||||||
| @@ -635,6 +672,12 @@ impl<B, K: KeepAlive> State<B, K> { | |||||||
|         self.keep_alive.disable(); |         self.keep_alive.disable(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     fn close_write(&mut self) { | ||||||
|  |         trace!("State::close_write()"); | ||||||
|  |         self.writing = Writing::Closed; | ||||||
|  |         self.keep_alive.disable(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     fn try_keep_alive(&mut self) { |     fn try_keep_alive(&mut self) { | ||||||
|         match (&self.reading, &self.writing) { |         match (&self.reading, &self.writing) { | ||||||
|             (&Reading::KeepAlive, &Writing::KeepAlive) => { |             (&Reading::KeepAlive, &Writing::KeepAlive) => { | ||||||
| @@ -652,14 +695,6 @@ impl<B, K: KeepAlive> State<B, K> { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn is_idle(&self) -> bool { |  | ||||||
|         if let KA::Idle = self.keep_alive.status() { |  | ||||||
|             true |  | ||||||
|         } else { |  | ||||||
|             false |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     fn busy(&mut self) { |     fn busy(&mut self) { | ||||||
|         if let KA::Disabled = self.keep_alive.status() { |         if let KA::Disabled = self.keep_alive.status() { | ||||||
|             return; |             return; | ||||||
| @@ -674,6 +709,14 @@ impl<B, K: KeepAlive> State<B, K> { | |||||||
|         self.keep_alive.idle(); |         self.keep_alive.idle(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     fn is_idle(&self) -> bool { | ||||||
|  |         if let KA::Idle = self.keep_alive.status() { | ||||||
|  |             true | ||||||
|  |         } else { | ||||||
|  |             false | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     fn is_read_closed(&self) -> bool { |     fn is_read_closed(&self) -> bool { | ||||||
|         match self.reading { |         match self.reading { | ||||||
|             Reading::Closed => true, |             Reading::Closed => true, | ||||||
| @@ -681,7 +724,6 @@ impl<B, K: KeepAlive> State<B, K> { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[allow(unused)] |  | ||||||
|     fn is_write_closed(&self) -> bool { |     fn is_write_closed(&self) -> bool { | ||||||
|         match self.writing { |         match self.writing { | ||||||
|             Writing::Closed => true, |             Writing::Closed => true, | ||||||
| @@ -727,7 +769,7 @@ mod tests { | |||||||
|     use futures::future; |     use futures::future; | ||||||
|     use tokio_proto::streaming::pipeline::Frame; |     use tokio_proto::streaming::pipeline::Frame; | ||||||
|  |  | ||||||
|     use proto::{self, MessageHead, ServerTransaction}; |     use proto::{self, ClientTransaction, MessageHead, ServerTransaction}; | ||||||
|     use super::super::h1::Encoder; |     use super::super::h1::Encoder; | ||||||
|     use mock::AsyncIo; |     use mock::AsyncIo; | ||||||
|  |  | ||||||
| @@ -799,22 +841,43 @@ mod tests { | |||||||
|         let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default()); |         let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default()); | ||||||
|         conn.state.idle(); |         conn.state.idle(); | ||||||
|  |  | ||||||
|         match conn.poll().unwrap() { |         match conn.poll() { | ||||||
|             Async::Ready(Some(Frame::Error { .. })) => {}, |             Err(ref err) if err.kind() == ::std::io::ErrorKind::UnexpectedEof => {}, | ||||||
|             other => panic!("frame is not Error: {:?}", other) |             other => panic!("unexpected frame: {:?}", other) | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_conn_init_read_eof_busy() { |     fn test_conn_init_read_eof_busy() { | ||||||
|  |         let _: Result<(), ()> = future::lazy(|| { | ||||||
|  |             // server ignores | ||||||
|             let io = AsyncIo::new_buf(vec![], 1); |             let io = AsyncIo::new_buf(vec![], 1); | ||||||
|             let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default()); |             let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default()); | ||||||
|             conn.state.busy(); |             conn.state.busy(); | ||||||
|  |  | ||||||
|             match conn.poll().unwrap() { |             match conn.poll().unwrap() { | ||||||
|             Async::Ready(Some(Frame::Error { .. })) => {}, |                 Async::Ready(None) => {}, | ||||||
|             other => panic!("frame is not Error: {:?}", other) |                 other => panic!("unexpected frame: {:?}", other) | ||||||
|             } |             } | ||||||
|  |  | ||||||
|  |             // client | ||||||
|  |             let io = AsyncIo::new_buf(vec![], 1); | ||||||
|  |             let mut conn = Conn::<_, proto::Chunk, ClientTransaction>::new(io, Default::default()); | ||||||
|  |             conn.state.busy(); | ||||||
|  |  | ||||||
|  |             match conn.poll() { | ||||||
|  |                 Ok(Async::NotReady) => {}, | ||||||
|  |                 other => panic!("unexpected frame: {:?}", other) | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             // once mid-request, returns the error | ||||||
|  |             conn.state.writing = super::Writing::KeepAlive; | ||||||
|  |             match conn.poll() { | ||||||
|  |                 Err(ref err) if err.kind() == ::std::io::ErrorKind::UnexpectedEof => {}, | ||||||
|  |                 other => panic!("unexpected frame: {:?}", other) | ||||||
|  |             } | ||||||
|  |             Ok(()) | ||||||
|  |         }).wait(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|   | |||||||
							
								
								
									
										325
									
								
								src/proto/dispatch.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										325
									
								
								src/proto/dispatch.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,325 @@ | |||||||
|  | use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; | ||||||
|  | use futures::sync::{mpsc, oneshot}; | ||||||
|  | use tokio_io::{AsyncRead, AsyncWrite}; | ||||||
|  | use tokio_service::Service; | ||||||
|  |  | ||||||
|  | use super::{Body, Conn, KeepAlive, Http1Transaction, MessageHead, RequestHead, ResponseHead}; | ||||||
|  | use ::StatusCode; | ||||||
|  |  | ||||||
|  | pub struct Dispatcher<D, Bs, I, B, T, K> { | ||||||
|  |     conn: Conn<I, B, T, K>, | ||||||
|  |     dispatch: D, | ||||||
|  |     body_tx: Option<super::body::BodySender>, | ||||||
|  |     body_rx: Option<Bs>, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub trait Dispatch { | ||||||
|  |     type PollItem; | ||||||
|  |     type PollBody; | ||||||
|  |     type RecvItem; | ||||||
|  |     fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error>; | ||||||
|  |     fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>; | ||||||
|  |     fn should_poll(&self) -> bool; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub struct Server<S: Service> { | ||||||
|  |     in_flight: Option<S::Future>, | ||||||
|  |     service: S, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub struct Client<B> { | ||||||
|  |     callback: Option<oneshot::Sender<::Result<::Response>>>, | ||||||
|  |     rx: ClientRx<B>, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type ClientRx<B> = mpsc::Receiver<(RequestHead, Option<B>, oneshot::Sender<::Result<::Response>>)>; | ||||||
|  |  | ||||||
|  | impl<D, Bs, I, B, T, K> Dispatcher<D, Bs, I, B, T, K> | ||||||
|  | where | ||||||
|  |     D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>, | ||||||
|  |     I: AsyncRead + AsyncWrite, | ||||||
|  |     B: AsRef<[u8]>, | ||||||
|  |     T: Http1Transaction, | ||||||
|  |     K: KeepAlive, | ||||||
|  |     Bs: Stream<Item=B, Error=::Error>, | ||||||
|  | { | ||||||
|  |     pub fn new(dispatch: D, conn: Conn<I, B, T, K>) -> Self { | ||||||
|  |         Dispatcher { | ||||||
|  |             conn: conn, | ||||||
|  |             dispatch: dispatch, | ||||||
|  |             body_tx: None, | ||||||
|  |             body_rx: None, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn poll_read(&mut self) -> Poll<(), ::Error> { | ||||||
|  |         loop { | ||||||
|  |             if self.conn.can_read_head() { | ||||||
|  |                 match self.conn.read_head() { | ||||||
|  |                     Ok(Async::Ready(Some((head, has_body)))) => { | ||||||
|  |                         let body = if has_body { | ||||||
|  |                             let (tx, rx) = super::Body::pair(); | ||||||
|  |                             self.body_tx = Some(tx); | ||||||
|  |                             rx | ||||||
|  |                         } else { | ||||||
|  |                             Body::empty() | ||||||
|  |                         }; | ||||||
|  |                         self.dispatch.recv_msg(Ok((head, body))).expect("recv_msg with Ok shouldn't error"); | ||||||
|  |                     }, | ||||||
|  |                     Ok(Async::Ready(None)) => { | ||||||
|  |                         // read eof, conn will start to shutdown automatically | ||||||
|  |                         return Ok(Async::Ready(())); | ||||||
|  |                     } | ||||||
|  |                     Ok(Async::NotReady) => return Ok(Async::NotReady), | ||||||
|  |                     Err(err) => { | ||||||
|  |                         debug!("read_head error: {}", err); | ||||||
|  |                         self.dispatch.recv_msg(Err(err))?; | ||||||
|  |                         // if here, the dispatcher gave the user the error | ||||||
|  |                         // somewhere else. we still need to shutdown, but | ||||||
|  |                         // not as a second error. | ||||||
|  |                         return Ok(Async::Ready(())); | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |             } else if let Some(mut body) = self.body_tx.take() { | ||||||
|  |                 let can_read_body = self.conn.can_read_body(); | ||||||
|  |                 match body.poll_ready() { | ||||||
|  |                     Ok(Async::Ready(())) => (), | ||||||
|  |                     Ok(Async::NotReady) => { | ||||||
|  |                         self.body_tx = Some(body); | ||||||
|  |                         return Ok(Async::NotReady); | ||||||
|  |                     }, | ||||||
|  |                     Err(_canceled) => { | ||||||
|  |                         // user doesn't care about the body | ||||||
|  |                         // so we should stop reading | ||||||
|  |                         if can_read_body { | ||||||
|  |                             trace!("body receiver dropped before eof, closing"); | ||||||
|  |                             self.conn.close_read(); | ||||||
|  |                             return Ok(Async::Ready(())); | ||||||
|  |                         } | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |                 if can_read_body { | ||||||
|  |                     match self.conn.read_body() { | ||||||
|  |                         Ok(Async::Ready(Some(chunk))) => { | ||||||
|  |                             match body.start_send(Ok(chunk)) { | ||||||
|  |                                 Ok(AsyncSink::Ready) => { | ||||||
|  |                                     self.body_tx = Some(body); | ||||||
|  |                                 }, | ||||||
|  |                                 Ok(AsyncSink::NotReady(_chunk)) => { | ||||||
|  |                                     unreachable!("mpsc poll_ready was ready, start_send was not"); | ||||||
|  |                                 } | ||||||
|  |                                 Err(_canceled) => { | ||||||
|  |                                     if self.conn.can_read_body() { | ||||||
|  |                                         trace!("body receiver dropped before eof, closing"); | ||||||
|  |                                         self.conn.close_read(); | ||||||
|  |                                     } | ||||||
|  |                                 } | ||||||
|  |  | ||||||
|  |                             } | ||||||
|  |                         }, | ||||||
|  |                         Ok(Async::Ready(None)) => { | ||||||
|  |                             let _ = body.close(); | ||||||
|  |                         }, | ||||||
|  |                         Ok(Async::NotReady) => { | ||||||
|  |                             self.body_tx = Some(body); | ||||||
|  |                             return Ok(Async::NotReady); | ||||||
|  |                         } | ||||||
|  |                         Err(e) => { | ||||||
|  |                             let _ = body.start_send(Err(::Error::Io(e))); | ||||||
|  |                         } | ||||||
|  |                     } | ||||||
|  |                 } else { | ||||||
|  |                     let _ = body.close(); | ||||||
|  |                 } | ||||||
|  |             } else { | ||||||
|  |                 self.conn.maybe_park_read(); | ||||||
|  |                 return Ok(Async::Ready(())); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn poll_write(&mut self) -> Poll<(), ::Error> { | ||||||
|  |         loop { | ||||||
|  |             if self.body_rx.is_none() && self.dispatch.should_poll() { | ||||||
|  |                 if let Some((head, body)) = try_ready!(self.dispatch.poll_msg()) { | ||||||
|  |                     self.conn.write_head(head, body.is_some()); | ||||||
|  |                     self.body_rx = body; | ||||||
|  |                 } else { | ||||||
|  |                     self.conn.close_write(); | ||||||
|  |                     return Ok(Async::Ready(())); | ||||||
|  |                 } | ||||||
|  |             } else if let Some(mut body) = self.body_rx.take() { | ||||||
|  |                 let chunk = match body.poll()? { | ||||||
|  |                     Async::Ready(Some(chunk)) => { | ||||||
|  |                         self.body_rx = Some(body); | ||||||
|  |                         chunk | ||||||
|  |                     }, | ||||||
|  |                     Async::Ready(None) => { | ||||||
|  |                         if self.conn.can_write_body() { | ||||||
|  |                             self.conn.write_body(None)?; | ||||||
|  |                         } | ||||||
|  |                         continue; | ||||||
|  |                     }, | ||||||
|  |                     Async::NotReady => { | ||||||
|  |                         self.body_rx = Some(body); | ||||||
|  |                         return Ok(Async::NotReady); | ||||||
|  |                     } | ||||||
|  |                 }; | ||||||
|  |                 self.conn.write_body(Some(chunk))?; | ||||||
|  |             } else { | ||||||
|  |                 return Ok(Async::NotReady); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn poll_flush(&mut self) -> Poll<(), ::Error> { | ||||||
|  |         self.conn.flush().map_err(|err| { | ||||||
|  |             debug!("error writing: {}", err); | ||||||
|  |             err.into() | ||||||
|  |         }) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn is_done(&self) -> bool { | ||||||
|  |         let read_done = self.conn.is_read_closed(); | ||||||
|  |         let write_done = self.conn.is_write_closed() || | ||||||
|  |             (!self.dispatch.should_poll() && self.body_rx.is_none()); | ||||||
|  |  | ||||||
|  |         read_done && write_done | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  |  | ||||||
|  | impl<D, Bs, I, B, T, K> Future for Dispatcher<D, Bs, I, B, T, K> | ||||||
|  | where | ||||||
|  |     D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>, | ||||||
|  |     I: AsyncRead + AsyncWrite, | ||||||
|  |     B: AsRef<[u8]>, | ||||||
|  |     T: Http1Transaction, | ||||||
|  |     K: KeepAlive, | ||||||
|  |     Bs: Stream<Item=B, Error=::Error>, | ||||||
|  | { | ||||||
|  |     type Item = (); | ||||||
|  |     type Error = ::Error; | ||||||
|  |  | ||||||
|  |     #[inline] | ||||||
|  |     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||||
|  |         self.poll_read()?; | ||||||
|  |         self.poll_write()?; | ||||||
|  |         self.poll_flush()?; | ||||||
|  |  | ||||||
|  |         if self.is_done() { | ||||||
|  |             trace!("Dispatch::poll done"); | ||||||
|  |             Ok(Async::Ready(())) | ||||||
|  |         } else { | ||||||
|  |             Ok(Async::NotReady) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // ===== impl Server ===== | ||||||
|  |  | ||||||
|  | impl<S> Server<S> where S: Service { | ||||||
|  |     pub fn new(service: S) -> Server<S> { | ||||||
|  |         Server { | ||||||
|  |             in_flight: None, | ||||||
|  |             service: service, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<S, Bs> Dispatch for Server<S> | ||||||
|  | where | ||||||
|  |     S: Service<Request=::Request, Response=::Response<Bs>, Error=::Error>, | ||||||
|  |     Bs: Stream<Error=::Error>, | ||||||
|  |     Bs::Item: AsRef<[u8]>, | ||||||
|  | { | ||||||
|  |     type PollItem = MessageHead<StatusCode>; | ||||||
|  |     type PollBody = Bs; | ||||||
|  |     type RecvItem = RequestHead; | ||||||
|  |  | ||||||
|  |     fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error> { | ||||||
|  |         if let Some(mut fut) = self.in_flight.take() { | ||||||
|  |             let resp = match fut.poll()? { | ||||||
|  |                 Async::Ready(res) => res, | ||||||
|  |                 Async::NotReady => { | ||||||
|  |                     self.in_flight = Some(fut); | ||||||
|  |                     return Ok(Async::NotReady); | ||||||
|  |                 } | ||||||
|  |             }; | ||||||
|  |             let (head, body) = super::response::split(resp); | ||||||
|  |             Ok(Async::Ready(Some((head.into(), body)))) | ||||||
|  |         } else { | ||||||
|  |             unreachable!("poll_msg shouldn't be called if no inflight"); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> { | ||||||
|  |         let (msg, body) = msg?; | ||||||
|  |         let req = super::request::from_wire(None, msg, body); | ||||||
|  |         self.in_flight = Some(self.service.call(req)); | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn should_poll(&self) -> bool { | ||||||
|  |         self.in_flight.is_some() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // ===== impl Client ===== | ||||||
|  |  | ||||||
|  | impl<B> Client<B> { | ||||||
|  |     pub fn new(rx: ClientRx<B>) -> Client<B> { | ||||||
|  |         Client { | ||||||
|  |             callback: None, | ||||||
|  |             rx: rx, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<B> Dispatch for Client<B> | ||||||
|  | where | ||||||
|  |     B: Stream<Error=::Error>, | ||||||
|  |     B::Item: AsRef<[u8]>, | ||||||
|  | { | ||||||
|  |     type PollItem = RequestHead; | ||||||
|  |     type PollBody = B; | ||||||
|  |     type RecvItem = ResponseHead; | ||||||
|  |  | ||||||
|  |     fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error> { | ||||||
|  |         match self.rx.poll() { | ||||||
|  |             Ok(Async::Ready(Some((head, body, cb)))) => { | ||||||
|  |                 self.callback = Some(cb); | ||||||
|  |                 Ok(Async::Ready(Some((head, body)))) | ||||||
|  |             }, | ||||||
|  |             Ok(Async::Ready(None)) => { | ||||||
|  |                 // user has dropped sender handle | ||||||
|  |                 Ok(Async::Ready(None)) | ||||||
|  |             }, | ||||||
|  |             Ok(Async::NotReady) => return Ok(Async::NotReady), | ||||||
|  |             Err(()) => unreachable!("mpsc receiver cannot error"), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> { | ||||||
|  |         match msg { | ||||||
|  |             Ok((msg, body)) => { | ||||||
|  |                 let res = super::response::from_wire(msg, Some(body)); | ||||||
|  |                 let cb = self.callback.take().expect("recv_msg without callback"); | ||||||
|  |                 let _ = cb.send(Ok(res)); | ||||||
|  |                 Ok(()) | ||||||
|  |             }, | ||||||
|  |             Err(err) => { | ||||||
|  |                 if let Some(cb) = self.callback.take() { | ||||||
|  |                     let _ = cb.send(Err(err)); | ||||||
|  |                     Ok(()) | ||||||
|  |                 } else { | ||||||
|  |                     Err(err) | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn should_poll(&self) -> bool { | ||||||
|  |         self.callback.is_none() | ||||||
|  |     } | ||||||
|  | } | ||||||
| @@ -132,6 +132,14 @@ impl Http1Transaction for ServerTransaction { | |||||||
|         extend(dst, b"\r\n"); |         extend(dst, b"\r\n"); | ||||||
|         body |         body | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     fn should_error_on_parse_eof() -> bool { | ||||||
|  |         false | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn should_read_first() -> bool { | ||||||
|  |         true | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl ServerTransaction { | impl ServerTransaction { | ||||||
| @@ -281,6 +289,14 @@ impl Http1Transaction for ClientTransaction { | |||||||
|  |  | ||||||
|         body |         body | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     fn should_error_on_parse_eof() -> bool { | ||||||
|  |         true | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn should_read_first() -> bool { | ||||||
|  |         false | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl ClientTransaction { | impl ClientTransaction { | ||||||
|   | |||||||
| @@ -84,7 +84,7 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> { | |||||||
|             match try_ready!(self.read_from_io()) { |             match try_ready!(self.read_from_io()) { | ||||||
|                 0 => { |                 0 => { | ||||||
|                     trace!("parse eof"); |                     trace!("parse eof"); | ||||||
|                     //TODO: With Rust 1.14, this can be Error::from(ErrorKind) |                     //TODO: utilize Error::Incomplete when Error type is redesigned | ||||||
|                     return Err(io::Error::new(io::ErrorKind::UnexpectedEof, ParseEof).into()); |                     return Err(io::Error::new(io::ErrorKind::UnexpectedEof, ParseEof).into()); | ||||||
|                 } |                 } | ||||||
|                 _ => {}, |                 _ => {}, | ||||||
| @@ -335,13 +335,13 @@ struct ParseEof; | |||||||
|  |  | ||||||
| impl fmt::Display for ParseEof { | impl fmt::Display for ParseEof { | ||||||
|     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||||
|         f.write_str("parse eof") |         f.write_str(::std::error::Error::description(self)) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl ::std::error::Error for ParseEof { | impl ::std::error::Error for ParseEof { | ||||||
|     fn description(&self) -> &str { |     fn description(&self) -> &str { | ||||||
|         "parse eof" |         "end of file reached before parsing could complete" | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -19,6 +19,7 @@ pub use self::chunk::Chunk; | |||||||
| mod body; | mod body; | ||||||
| mod chunk; | mod chunk; | ||||||
| mod conn; | mod conn; | ||||||
|  | pub mod dispatch; | ||||||
| mod io; | mod io; | ||||||
| mod h1; | mod h1; | ||||||
| //mod h2; | //mod h2; | ||||||
| @@ -146,6 +147,9 @@ pub trait Http1Transaction { | |||||||
|     fn parse(bytes: &mut BytesMut) -> ParseResult<Self::Incoming>; |     fn parse(bytes: &mut BytesMut) -> ParseResult<Self::Incoming>; | ||||||
|     fn decoder(head: &MessageHead<Self::Incoming>, method: &mut Option<::Method>) -> ::Result<h1::Decoder>; |     fn decoder(head: &MessageHead<Self::Incoming>, method: &mut Option<::Method>) -> ::Result<h1::Decoder>; | ||||||
|     fn encode(head: MessageHead<Self::Outgoing>, has_body: bool, method: &mut Option<Method>, dst: &mut Vec<u8>) -> h1::Encoder; |     fn encode(head: MessageHead<Self::Outgoing>, has_body: bool, method: &mut Option<Method>, dst: &mut Vec<u8>) -> h1::Encoder; | ||||||
|  |  | ||||||
|  |     fn should_error_on_parse_eof() -> bool; | ||||||
|  |     fn should_read_first() -> bool; | ||||||
| } | } | ||||||
|  |  | ||||||
| pub type ParseResult<T> = ::Result<Option<(MessageHead<T>, usize)>>; | pub type ParseResult<T> = ::Result<Option<(MessageHead<T>, usize)>>; | ||||||
|   | |||||||
| @@ -66,6 +66,7 @@ where B: Stream<Error=::Error>, | |||||||
|     core: Core, |     core: Core, | ||||||
|     listener: TcpListener, |     listener: TcpListener, | ||||||
|     shutdown_timeout: Duration, |     shutdown_timeout: Duration, | ||||||
|  |     no_proto: bool, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<B: AsRef<[u8]> + 'static> Http<B> { | impl<B: AsRef<[u8]> + 'static> Http<B> { | ||||||
| @@ -121,6 +122,7 @@ impl<B: AsRef<[u8]> + 'static> Http<B> { | |||||||
|             listener: listener, |             listener: listener, | ||||||
|             protocol: self.clone(), |             protocol: self.clone(), | ||||||
|             shutdown_timeout: Duration::new(1, 0), |             shutdown_timeout: Duration::new(1, 0), | ||||||
|  |             no_proto: false, | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -165,6 +167,30 @@ impl<B: AsRef<[u8]> + 'static> Http<B> { | |||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// Bind a connection together with a Service. | ||||||
|  |     /// | ||||||
|  |     /// This returns a Future that must be polled in order for HTTP to be | ||||||
|  |     /// driven on the connection. | ||||||
|  |     /// | ||||||
|  |     /// This additionally skips the tokio-proto infrastructure internally. | ||||||
|  |     pub fn no_proto<S, I, Bd>(&self, io: I, service: S) -> Connection<I, Bd, S> | ||||||
|  |         where S: Service<Request = Request, Response = Response<Bd>, Error = ::Error> + 'static, | ||||||
|  |               Bd: Stream<Item=B, Error=::Error> + 'static, | ||||||
|  |               I: AsyncRead + AsyncWrite + 'static, | ||||||
|  |  | ||||||
|  |     { | ||||||
|  |         let ka = if self.keep_alive { | ||||||
|  |             proto::KA::Busy | ||||||
|  |         } else { | ||||||
|  |             proto::KA::Disabled | ||||||
|  |         }; | ||||||
|  |         let mut conn = proto::Conn::new(io, ka); | ||||||
|  |         conn.set_flush_pipeline(self.pipeline); | ||||||
|  |         Connection { | ||||||
|  |             conn: proto::dispatch::Dispatcher::new(proto::dispatch::Server::new(service), conn), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /// Bind a `Service` using types from the `http` crate. |     /// Bind a `Service` using types from the `http` crate. | ||||||
|     /// |     /// | ||||||
|     /// See `Http::bind_connection`. |     /// See `Http::bind_connection`. | ||||||
| @@ -185,6 +211,67 @@ impl<B: AsRef<[u8]> + 'static> Http<B> { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /// A future binding a connection with a Service. | ||||||
|  | /// | ||||||
|  | /// Polling this future will drive HTTP forward. | ||||||
|  | #[must_use = "futures do nothing unless polled"] | ||||||
|  | pub struct Connection<I, B, S> | ||||||
|  | where S: Service, | ||||||
|  |       B: Stream<Error=::Error>, | ||||||
|  |       B::Item: AsRef<[u8]>, | ||||||
|  | { | ||||||
|  |     conn: proto::dispatch::Dispatcher<proto::dispatch::Server<S>, B, I, B::Item, proto::ServerTransaction, proto::KA>, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<I, B, S> Future for Connection<I, B, S> | ||||||
|  | where S: Service<Request = Request, Response = Response<B>, Error = ::Error> + 'static, | ||||||
|  |       I: AsyncRead + AsyncWrite + 'static, | ||||||
|  |       B: Stream<Error=::Error> + 'static, | ||||||
|  |       B::Item: AsRef<[u8]>, | ||||||
|  | { | ||||||
|  |     type Item = self::unnameable::Opaque; | ||||||
|  |     type Error = ::Error; | ||||||
|  |  | ||||||
|  |     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||||
|  |         try_ready!(self.conn.poll()); | ||||||
|  |         Ok(self::unnameable::opaque().into()) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<I, B, S> fmt::Debug for Connection<I, B, S>  | ||||||
|  | where S: Service, | ||||||
|  |       B: Stream<Error=::Error>, | ||||||
|  |       B::Item: AsRef<[u8]>, | ||||||
|  | { | ||||||
|  |     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||||
|  |         f.debug_struct("Connection") | ||||||
|  |             .finish() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | mod unnameable { | ||||||
|  |     // This type is specifically not exported outside the crate, | ||||||
|  |     // so no one can actually name the type. With no methods, we make no | ||||||
|  |     // promises about this type. | ||||||
|  |     // | ||||||
|  |     // All of that to say we can eventually replace the type returned | ||||||
|  |     // to something else, and it would not be a breaking change. | ||||||
|  |     // | ||||||
|  |     // We may want to eventually yield the `T: AsyncRead + AsyncWrite`, which | ||||||
|  |     // doesn't have a `Debug` bound. So, this type can't implement `Debug` | ||||||
|  |     // either, so the type change doesn't break people. | ||||||
|  |     #[allow(missing_debug_implementations)] | ||||||
|  |     pub struct Opaque { | ||||||
|  |         _inner: (), | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn opaque() -> Opaque { | ||||||
|  |         Opaque { | ||||||
|  |             _inner: (), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| impl<B> Clone for Http<B> { | impl<B> Clone for Http<B> { | ||||||
|     fn clone(&self) -> Http<B> { |     fn clone(&self) -> Http<B> { | ||||||
|         Http { |         Http { | ||||||
| @@ -207,7 +294,7 @@ impl<B> fmt::Debug for Http<B> { | |||||||
| pub struct __ProtoRequest(proto::RequestHead); | pub struct __ProtoRequest(proto::RequestHead); | ||||||
| #[doc(hidden)] | #[doc(hidden)] | ||||||
| #[allow(missing_debug_implementations)] | #[allow(missing_debug_implementations)] | ||||||
| pub struct __ProtoResponse(ResponseHead); | pub struct __ProtoResponse(proto::MessageHead<::StatusCode>); | ||||||
| #[doc(hidden)] | #[doc(hidden)] | ||||||
| #[allow(missing_debug_implementations)] | #[allow(missing_debug_implementations)] | ||||||
| pub struct __ProtoTransport<T, B>(proto::Conn<T, B, proto::ServerTransaction>); | pub struct __ProtoTransport<T, B>(proto::Conn<T, B, proto::ServerTransaction>); | ||||||
| @@ -368,8 +455,6 @@ struct HttpService<T> { | |||||||
|     remote_addr: SocketAddr, |     remote_addr: SocketAddr, | ||||||
| } | } | ||||||
|  |  | ||||||
| type ResponseHead = proto::MessageHead<::StatusCode>; |  | ||||||
|  |  | ||||||
| impl<T, B> Service for HttpService<T> | impl<T, B> Service for HttpService<T> | ||||||
|     where T: Service<Request=Request, Response=Response<B>, Error=::Error>, |     where T: Service<Request=Request, Response=Response<B>, Error=::Error>, | ||||||
|           B: Stream<Error=::Error>, |           B: Stream<Error=::Error>, | ||||||
| @@ -420,6 +505,12 @@ impl<S, B> Server<S, B> | |||||||
|         self |         self | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// Configure this server to not use tokio-proto infrastructure internally. | ||||||
|  |     pub fn no_proto(&mut self) -> &mut Self { | ||||||
|  |         self.no_proto = true; | ||||||
|  |         self | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /// Execute this server infinitely. |     /// Execute this server infinitely. | ||||||
|     /// |     /// | ||||||
|     /// This method does not currently return, but it will return an error if |     /// This method does not currently return, but it will return an error if | ||||||
| @@ -444,7 +535,7 @@ impl<S, B> Server<S, B> | |||||||
|     pub fn run_until<F>(self, shutdown_signal: F) -> ::Result<()> |     pub fn run_until<F>(self, shutdown_signal: F) -> ::Result<()> | ||||||
|         where F: Future<Item = (), Error = ()>, |         where F: Future<Item = (), Error = ()>, | ||||||
|     { |     { | ||||||
|         let Server { protocol, new_service, mut core, listener, shutdown_timeout } = self; |         let Server { protocol, new_service, mut core, listener, shutdown_timeout, no_proto } = self; | ||||||
|         let handle = core.handle(); |         let handle = core.handle(); | ||||||
|  |  | ||||||
|         // Mini future to track the number of active services |         // Mini future to track the number of active services | ||||||
| @@ -460,7 +551,14 @@ impl<S, B> Server<S, B> | |||||||
|                 info: Rc::downgrade(&info), |                 info: Rc::downgrade(&info), | ||||||
|             }; |             }; | ||||||
|             info.borrow_mut().active += 1; |             info.borrow_mut().active += 1; | ||||||
|  |             if no_proto { | ||||||
|  |                 let fut = protocol.no_proto(socket, s) | ||||||
|  |                     .map(|_| ()) | ||||||
|  |                     .map_err(|err| error!("no_proto error: {}", err)); | ||||||
|  |                 handle.spawn(fut); | ||||||
|  |             } else { | ||||||
|                 protocol.bind_connection(&handle, socket, addr, s); |                 protocol.bind_connection(&handle, socket, addr, s); | ||||||
|  |             } | ||||||
|             Ok(()) |             Ok(()) | ||||||
|         }); |         }); | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										312
									
								
								tests/client.rs
									
									
									
									
									
								
							
							
						
						
									
										312
									
								
								tests/client.rs
									
									
									
									
									
								
							| @@ -2,6 +2,7 @@ | |||||||
| extern crate hyper; | extern crate hyper; | ||||||
| extern crate futures; | extern crate futures; | ||||||
| extern crate tokio_core; | extern crate tokio_core; | ||||||
|  | extern crate tokio_io; | ||||||
| extern crate pretty_env_logger; | extern crate pretty_env_logger; | ||||||
|  |  | ||||||
| use std::io::{self, Read, Write}; | use std::io::{self, Read, Write}; | ||||||
| @@ -18,13 +19,24 @@ use futures::sync::oneshot; | |||||||
| use tokio_core::reactor::{Core, Handle}; | use tokio_core::reactor::{Core, Handle}; | ||||||
|  |  | ||||||
| fn client(handle: &Handle) -> Client<HttpConnector> { | fn client(handle: &Handle) -> Client<HttpConnector> { | ||||||
|     Client::new(handle) |     let mut config = Client::configure(); | ||||||
|  |     if env("HYPER_NO_PROTO", "1") { | ||||||
|  |         config = config.no_proto(); | ||||||
|  |     } | ||||||
|  |     config.build(handle) | ||||||
| } | } | ||||||
|  |  | ||||||
| fn s(buf: &[u8]) -> &str { | fn s(buf: &[u8]) -> &str { | ||||||
|     ::std::str::from_utf8(buf).unwrap() |     ::std::str::from_utf8(buf).unwrap() | ||||||
| } | } | ||||||
|  |  | ||||||
|  | fn env(name: &str, val: &str) -> bool { | ||||||
|  |     match ::std::env::var(name) { | ||||||
|  |         Ok(var) => var == val, | ||||||
|  |         Err(_) => false, | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| macro_rules! test { | macro_rules! test { | ||||||
|     ( |     ( | ||||||
|         name: $name:ident, |         name: $name:ident, | ||||||
| @@ -49,9 +61,95 @@ macro_rules! test { | |||||||
|             #![allow(unused)] |             #![allow(unused)] | ||||||
|             use hyper::header::*; |             use hyper::header::*; | ||||||
|             let _ = pretty_env_logger::init(); |             let _ = pretty_env_logger::init(); | ||||||
|  |             let mut core = Core::new().unwrap(); | ||||||
|  |  | ||||||
|  |             let res = test! { | ||||||
|  |                 INNER; | ||||||
|  |                 core: &mut core, | ||||||
|  |                 server: | ||||||
|  |                     expected: $server_expected, | ||||||
|  |                     reply: $server_reply, | ||||||
|  |                 client: | ||||||
|  |                     request: | ||||||
|  |                         method: $client_method, | ||||||
|  |                         url: $client_url, | ||||||
|  |                         headers: [ $($request_headers,)* ], | ||||||
|  |                         body: $request_body, | ||||||
|  |                         proxy: $request_proxy, | ||||||
|  |             }.unwrap(); | ||||||
|  |  | ||||||
|  |  | ||||||
|  |             assert_eq!(res.status(), StatusCode::$client_status); | ||||||
|  |             $( | ||||||
|  |                 assert_eq!(res.headers().get(), Some(&$response_headers)); | ||||||
|  |             )* | ||||||
|  |  | ||||||
|  |             let body = core.run(res.body().concat2()).unwrap(); | ||||||
|  |  | ||||||
|  |             let expected_res_body = Option::<&[u8]>::from($response_body) | ||||||
|  |                 .unwrap_or_default(); | ||||||
|  |             assert_eq!(body.as_ref(), expected_res_body); | ||||||
|  |         } | ||||||
|  |     ); | ||||||
|  |     ( | ||||||
|  |         name: $name:ident, | ||||||
|  |         server: | ||||||
|  |             expected: $server_expected:expr, | ||||||
|  |             reply: $server_reply:expr, | ||||||
|  |         client: | ||||||
|  |             request: | ||||||
|  |                 method: $client_method:ident, | ||||||
|  |                 url: $client_url:expr, | ||||||
|  |                 headers: [ $($request_headers:expr,)* ], | ||||||
|  |                 body: $request_body:expr, | ||||||
|  |                 proxy: $request_proxy:expr, | ||||||
|  |  | ||||||
|  |             error: $err:expr, | ||||||
|  |     ) => ( | ||||||
|  |         #[test] | ||||||
|  |         fn $name() { | ||||||
|  |             #![allow(unused)] | ||||||
|  |             use hyper::header::*; | ||||||
|  |             let _ = pretty_env_logger::init(); | ||||||
|  |             let mut core = Core::new().unwrap(); | ||||||
|  |  | ||||||
|  |             let err = test! { | ||||||
|  |                 INNER; | ||||||
|  |                 core: &mut core, | ||||||
|  |                 server: | ||||||
|  |                     expected: $server_expected, | ||||||
|  |                     reply: $server_reply, | ||||||
|  |                 client: | ||||||
|  |                     request: | ||||||
|  |                         method: $client_method, | ||||||
|  |                         url: $client_url, | ||||||
|  |                         headers: [ $($request_headers,)* ], | ||||||
|  |                         body: $request_body, | ||||||
|  |                         proxy: $request_proxy, | ||||||
|  |             }.unwrap_err(); | ||||||
|  |             if !$err(&err) { | ||||||
|  |                 panic!("unexpected error: {:?}", err) | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     ); | ||||||
|  |  | ||||||
|  |     ( | ||||||
|  |         INNER; | ||||||
|  |         core: $core:expr, | ||||||
|  |         server: | ||||||
|  |             expected: $server_expected:expr, | ||||||
|  |             reply: $server_reply:expr, | ||||||
|  |         client: | ||||||
|  |             request: | ||||||
|  |                 method: $client_method:ident, | ||||||
|  |                 url: $client_url:expr, | ||||||
|  |                 headers: [ $($request_headers:expr,)* ], | ||||||
|  |                 body: $request_body:expr, | ||||||
|  |                 proxy: $request_proxy:expr, | ||||||
|  |     ) => ({ | ||||||
|         let server = TcpListener::bind("127.0.0.1:0").unwrap(); |         let server = TcpListener::bind("127.0.0.1:0").unwrap(); | ||||||
|         let addr = server.local_addr().unwrap(); |         let addr = server.local_addr().unwrap(); | ||||||
|             let mut core = Core::new().unwrap(); |         let mut core = $core; | ||||||
|         let client = client(&core.handle()); |         let client = client(&core.handle()); | ||||||
|         let mut req = Request::new(Method::$client_method, format!($client_url, addr=addr).parse().unwrap()); |         let mut req = Request::new(Method::$client_method, format!($client_url, addr=addr).parse().unwrap()); | ||||||
|         $( |         $( | ||||||
| @@ -93,19 +191,8 @@ macro_rules! test { | |||||||
|  |  | ||||||
|         let work = res.join(rx).map(|r| r.0); |         let work = res.join(rx).map(|r| r.0); | ||||||
|  |  | ||||||
|             let res = core.run(work).unwrap(); |         core.run(work) | ||||||
|             assert_eq!(res.status(), StatusCode::$client_status); |     }); | ||||||
|             $( |  | ||||||
|                 assert_eq!(res.headers().get(), Some(&$response_headers)); |  | ||||||
|             )* |  | ||||||
|  |  | ||||||
|             let body = core.run(res.body().concat2()).unwrap(); |  | ||||||
|  |  | ||||||
|             let expected_res_body = Option::<&[u8]>::from($response_body) |  | ||||||
|                 .unwrap_or_default(); |  | ||||||
|             assert_eq!(body.as_ref(), expected_res_body); |  | ||||||
|         } |  | ||||||
|     ); |  | ||||||
| } | } | ||||||
|  |  | ||||||
| static REPLY_OK: &'static str = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"; | static REPLY_OK: &'static str = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"; | ||||||
| @@ -266,6 +353,93 @@ test! { | |||||||
|             body: None, |             body: None, | ||||||
| } | } | ||||||
|  |  | ||||||
|  | test! { | ||||||
|  |     name: client_pipeline_responses_extra, | ||||||
|  |  | ||||||
|  |     server: | ||||||
|  |         expected: "\ | ||||||
|  |             GET /pipe HTTP/1.1\r\n\ | ||||||
|  |             Host: {addr}\r\n\ | ||||||
|  |             \r\n\ | ||||||
|  |             ", | ||||||
|  |         reply: "\ | ||||||
|  |             HTTP/1.1 200 OK\r\n\ | ||||||
|  |             Content-Length: 0\r\n\ | ||||||
|  |             \r\n\ | ||||||
|  |             HTTP/1.1 200 OK\r\n\ | ||||||
|  |             Content-Length: 0\r\n\ | ||||||
|  |             \r\n\ | ||||||
|  |             ", | ||||||
|  |  | ||||||
|  |     client: | ||||||
|  |         request: | ||||||
|  |             method: Get, | ||||||
|  |             url: "http://{addr}/pipe", | ||||||
|  |             headers: [], | ||||||
|  |             body: None, | ||||||
|  |             proxy: false, | ||||||
|  |         response: | ||||||
|  |             status: Ok, | ||||||
|  |             headers: [], | ||||||
|  |             body: None, | ||||||
|  | } | ||||||
|  |  | ||||||
|  |  | ||||||
|  | test! { | ||||||
|  |     name: client_error_unexpected_eof, | ||||||
|  |  | ||||||
|  |     server: | ||||||
|  |         expected: "\ | ||||||
|  |             GET /err HTTP/1.1\r\n\ | ||||||
|  |             Host: {addr}\r\n\ | ||||||
|  |             \r\n\ | ||||||
|  |             ", | ||||||
|  |         reply: "\ | ||||||
|  |             HTTP/1.1 200 OK\r\n\ | ||||||
|  |             ", // unexpected eof before double CRLF | ||||||
|  |  | ||||||
|  |     client: | ||||||
|  |         request: | ||||||
|  |             method: Get, | ||||||
|  |             url: "http://{addr}/err", | ||||||
|  |             headers: [], | ||||||
|  |             body: None, | ||||||
|  |             proxy: false, | ||||||
|  |         error: |err| match err { | ||||||
|  |             &hyper::Error::Io(_) => true, | ||||||
|  |             _ => false, | ||||||
|  |         }, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | test! { | ||||||
|  |     name: client_error_parse_version, | ||||||
|  |  | ||||||
|  |     server: | ||||||
|  |         expected: "\ | ||||||
|  |             GET /err HTTP/1.1\r\n\ | ||||||
|  |             Host: {addr}\r\n\ | ||||||
|  |             \r\n\ | ||||||
|  |             ", | ||||||
|  |         reply: "\ | ||||||
|  |             HEAT/1.1 200 OK\r\n\ | ||||||
|  |             \r\n\ | ||||||
|  |             ", | ||||||
|  |  | ||||||
|  |     client: | ||||||
|  |         request: | ||||||
|  |             method: Get, | ||||||
|  |             url: "http://{addr}/err", | ||||||
|  |             headers: [], | ||||||
|  |             body: None, | ||||||
|  |             proxy: false, | ||||||
|  |         error: |err| match err { | ||||||
|  |             &hyper::Error::Version if env("HYPER_NO_PROTO", "1") => true, | ||||||
|  |             &hyper::Error::Io(_) if !env("HYPER_NO_PROTO", "1") => true, | ||||||
|  |             _ => false, | ||||||
|  |         }, | ||||||
|  |  | ||||||
|  | } | ||||||
|  |  | ||||||
| #[test] | #[test] | ||||||
| fn client_keep_alive() { | fn client_keep_alive() { | ||||||
|     let server = TcpListener::bind("127.0.0.1:0").unwrap(); |     let server = TcpListener::bind("127.0.0.1:0").unwrap(); | ||||||
| @@ -285,9 +459,10 @@ fn client_keep_alive() { | |||||||
|         sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 1"); |         sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 1"); | ||||||
|         let _ = tx1.send(()); |         let _ = tx1.send(()); | ||||||
|  |  | ||||||
|         sock.read(&mut buf).expect("read 2"); |         let n2 = sock.read(&mut buf).expect("read 2"); | ||||||
|         let second_get = b"GET /b HTTP/1.1\r\n"; |         assert_ne!(n2, 0); | ||||||
|         assert_eq!(&buf[..second_get.len()], second_get); |         let second_get = "GET /b HTTP/1.1\r\n"; | ||||||
|  |         assert_eq!(s(&buf[..second_get.len()]), second_get); | ||||||
|         sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 2"); |         sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 2"); | ||||||
|         let _ = tx2.send(()); |         let _ = tx2.send(()); | ||||||
|     }); |     }); | ||||||
| @@ -367,3 +542,104 @@ fn client_pooled_socket_disconnected() { | |||||||
|     assert_ne!(addr1, addr2); |     assert_ne!(addr1, addr2); | ||||||
| } | } | ||||||
| */ | */ | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn drop_body_before_eof_closes_connection() { | ||||||
|  |     // https://github.com/hyperium/hyper/issues/1353 | ||||||
|  |     use std::io::{self, Read, Write}; | ||||||
|  |     use std::sync::Arc; | ||||||
|  |     use std::sync::atomic::{AtomicUsize, Ordering}; | ||||||
|  |     use std::time::Duration; | ||||||
|  |     use tokio_core::reactor::{Timeout}; | ||||||
|  |     use tokio_core::net::TcpStream; | ||||||
|  |     use tokio_io::{AsyncRead, AsyncWrite}; | ||||||
|  |     use hyper::client::HttpConnector; | ||||||
|  |     use hyper::server::Service; | ||||||
|  |     use hyper::Uri; | ||||||
|  |  | ||||||
|  |     let _ = pretty_env_logger::init(); | ||||||
|  |  | ||||||
|  |     let server = TcpListener::bind("127.0.0.1:0").unwrap(); | ||||||
|  |     let addr = server.local_addr().unwrap(); | ||||||
|  |     let mut core = Core::new().unwrap(); | ||||||
|  |     let handle = core.handle(); | ||||||
|  |     let closes = Arc::new(AtomicUsize::new(0)); | ||||||
|  |     let client = Client::configure() | ||||||
|  |         .connector(DebugConnector(HttpConnector::new(1, &core.handle()), closes.clone())) | ||||||
|  |         .no_proto() | ||||||
|  |         .build(&handle); | ||||||
|  |  | ||||||
|  |     let (tx1, rx1) = oneshot::channel(); | ||||||
|  |  | ||||||
|  |     thread::spawn(move || { | ||||||
|  |         let mut sock = server.accept().unwrap().0; | ||||||
|  |         sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); | ||||||
|  |         sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); | ||||||
|  |         let mut buf = [0; 4096]; | ||||||
|  |         sock.read(&mut buf).expect("read 1"); | ||||||
|  |         let body = vec![b'x'; 1024 * 128]; | ||||||
|  |         write!(sock, "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", body.len()).expect("write head"); | ||||||
|  |         let _ = sock.write_all(&body); | ||||||
|  |         let _ = tx1.send(()); | ||||||
|  |     }); | ||||||
|  |  | ||||||
|  |     let uri = format!("http://{}/a", addr).parse().unwrap(); | ||||||
|  |  | ||||||
|  |     let res = client.get(uri).and_then(move |res| { | ||||||
|  |         assert_eq!(res.status(), hyper::StatusCode::Ok); | ||||||
|  |         Timeout::new(Duration::from_secs(1), &handle).unwrap() | ||||||
|  |             .from_err() | ||||||
|  |     }); | ||||||
|  |     let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); | ||||||
|  |     core.run(res.join(rx).map(|r| r.0)).unwrap(); | ||||||
|  |  | ||||||
|  |     assert_eq!(closes.load(Ordering::Relaxed), 1); | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     struct DebugConnector(HttpConnector, Arc<AtomicUsize>); | ||||||
|  |  | ||||||
|  |     impl Service for DebugConnector { | ||||||
|  |         type Request = Uri; | ||||||
|  |         type Response = DebugStream; | ||||||
|  |         type Error = io::Error; | ||||||
|  |         type Future = Box<Future<Item = DebugStream, Error = io::Error>>; | ||||||
|  |  | ||||||
|  |         fn call(&self, uri: Uri) -> Self::Future { | ||||||
|  |             let counter = self.1.clone(); | ||||||
|  |             Box::new(self.0.call(uri).map(move |s| DebugStream(s, counter))) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     struct DebugStream(TcpStream, Arc<AtomicUsize>); | ||||||
|  |  | ||||||
|  |     impl Drop for DebugStream { | ||||||
|  |         fn drop(&mut self) { | ||||||
|  |             self.1.fetch_add(1, Ordering::SeqCst); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     impl Write for DebugStream { | ||||||
|  |         fn write(&mut self, buf: &[u8]) -> io::Result<usize> { | ||||||
|  |             self.0.write(buf) | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         fn flush(&mut self) -> io::Result<()> { | ||||||
|  |             self.0.flush() | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     impl AsyncWrite for DebugStream { | ||||||
|  |         fn shutdown(&mut self) -> futures::Poll<(), io::Error> { | ||||||
|  |             AsyncWrite::shutdown(&mut self.0) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     impl Read for DebugStream { | ||||||
|  |         fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { | ||||||
|  |             self.0.read(buf) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     impl AsyncRead for DebugStream {} | ||||||
|  | } | ||||||
|   | |||||||
| @@ -3,10 +3,15 @@ extern crate hyper; | |||||||
| extern crate futures; | extern crate futures; | ||||||
| extern crate spmc; | extern crate spmc; | ||||||
| extern crate pretty_env_logger; | extern crate pretty_env_logger; | ||||||
|  | extern crate tokio_core; | ||||||
|  |  | ||||||
| use futures::{Future, Stream}; | use futures::{Future, Stream}; | ||||||
|  | use futures::future::{self, FutureResult}; | ||||||
| use futures::sync::oneshot; | use futures::sync::oneshot; | ||||||
|  |  | ||||||
|  | use tokio_core::net::TcpListener; | ||||||
|  | use tokio_core::reactor::Core; | ||||||
|  |  | ||||||
| use std::net::{TcpStream, SocketAddr}; | use std::net::{TcpStream, SocketAddr}; | ||||||
| use std::io::{Read, Write}; | use std::io::{Read, Write}; | ||||||
| use std::sync::mpsc; | use std::sync::mpsc; | ||||||
| @@ -387,6 +392,7 @@ fn disable_keep_alive() { | |||||||
|         .header(hyper::header::ContentLength(quux.len() as u64)) |         .header(hyper::header::ContentLength(quux.len() as u64)) | ||||||
|         .body(quux); |         .body(quux); | ||||||
|  |  | ||||||
|  |     // the write can possibly succeed, since it fills the kernel buffer on the first write | ||||||
|     let _ = req.write_all(b"\ |     let _ = req.write_all(b"\ | ||||||
|         GET /quux HTTP/1.1\r\n\ |         GET /quux HTTP/1.1\r\n\ | ||||||
|         Host: example.domain\r\n\ |         Host: example.domain\r\n\ | ||||||
| @@ -394,7 +400,6 @@ fn disable_keep_alive() { | |||||||
|         \r\n\ |         \r\n\ | ||||||
|     "); |     "); | ||||||
|  |  | ||||||
|     // the write can possibly succeed, since it fills the kernel buffer on the first write |  | ||||||
|     let mut buf = [0; 1024 * 8]; |     let mut buf = [0; 1024 * 8]; | ||||||
|     match req.read(&mut buf[..]) { |     match req.read(&mut buf[..]) { | ||||||
|         // Ok(0) means EOF, so a proper shutdown |         // Ok(0) means EOF, so a proper shutdown | ||||||
| @@ -504,6 +509,50 @@ fn pipeline_enabled() { | |||||||
|     assert_eq!(n, 0); |     assert_eq!(n, 0); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn no_proto_empty_parse_eof_does_not_return_error() { | ||||||
|  |     let mut core = Core::new().unwrap(); | ||||||
|  |     let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); | ||||||
|  |     let addr = listener.local_addr().unwrap(); | ||||||
|  |  | ||||||
|  |     thread::spawn(move || { | ||||||
|  |         let _tcp = connect(&addr); | ||||||
|  |     }); | ||||||
|  |  | ||||||
|  |     let fut = listener.incoming() | ||||||
|  |         .into_future() | ||||||
|  |         .map_err(|_| unreachable!()) | ||||||
|  |         .and_then(|(item, _incoming)| { | ||||||
|  |             let (socket, _) = item.unwrap(); | ||||||
|  |             Http::new().no_proto(socket, HelloWorld) | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |     core.run(fut).unwrap(); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn no_proto_nonempty_parse_eof_returns_error() { | ||||||
|  |     let mut core = Core::new().unwrap(); | ||||||
|  |     let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); | ||||||
|  |     let addr = listener.local_addr().unwrap(); | ||||||
|  |  | ||||||
|  |     thread::spawn(move || { | ||||||
|  |         let mut tcp = connect(&addr); | ||||||
|  |         tcp.write_all(b"GET / HTTP/1.1").unwrap(); | ||||||
|  |     }); | ||||||
|  |  | ||||||
|  |     let fut = listener.incoming() | ||||||
|  |         .into_future() | ||||||
|  |         .map_err(|_| unreachable!()) | ||||||
|  |         .and_then(|(item, _incoming)| { | ||||||
|  |             let (socket, _) = item.unwrap(); | ||||||
|  |             Http::new().no_proto(socket, HelloWorld) | ||||||
|  |                 .map(|_| ()) | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |     core.run(fut).unwrap_err(); | ||||||
|  | } | ||||||
|  |  | ||||||
| // ------------------------------------------------- | // ------------------------------------------------- | ||||||
| // the Server that is used to run all the tests with | // the Server that is used to run all the tests with | ||||||
| // ------------------------------------------------- | // ------------------------------------------------- | ||||||
| @@ -628,6 +677,20 @@ impl Service for TestService { | |||||||
|  |  | ||||||
| } | } | ||||||
|  |  | ||||||
|  | struct HelloWorld; | ||||||
|  |  | ||||||
|  | impl Service for HelloWorld { | ||||||
|  |     type Request = Request; | ||||||
|  |     type Response = Response; | ||||||
|  |     type Error = hyper::Error; | ||||||
|  |     type Future = FutureResult<Self::Response, Self::Error>; | ||||||
|  |  | ||||||
|  |     fn call(&self, _req: Request) -> Self::Future { | ||||||
|  |         future::ok(Response::new()) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  |  | ||||||
| fn connect(addr: &SocketAddr) -> TcpStream { | fn connect(addr: &SocketAddr) -> TcpStream { | ||||||
|     let req = TcpStream::connect(addr).unwrap(); |     let req = TcpStream::connect(addr).unwrap(); | ||||||
|     req.set_read_timeout(Some(Duration::from_secs(1))).unwrap(); |     req.set_read_timeout(Some(Duration::from_secs(1))).unwrap(); | ||||||
| @@ -639,13 +702,31 @@ fn serve() -> Serve { | |||||||
|     serve_with_options(Default::default()) |     serve_with_options(Default::default()) | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Default)] |  | ||||||
| struct ServeOptions { | struct ServeOptions { | ||||||
|     keep_alive_disabled: bool, |     keep_alive_disabled: bool, | ||||||
|  |     no_proto: bool, | ||||||
|     pipeline: bool, |     pipeline: bool, | ||||||
|     timeout: Option<Duration>, |     timeout: Option<Duration>, | ||||||
| } | } | ||||||
|  |  | ||||||
|  | impl Default for ServeOptions { | ||||||
|  |     fn default() -> Self { | ||||||
|  |         ServeOptions { | ||||||
|  |             keep_alive_disabled: false, | ||||||
|  |             no_proto: env("HYPER_NO_PROTO", "1"), | ||||||
|  |             pipeline: false, | ||||||
|  |             timeout: None, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn env(name: &str, val: &str) -> bool { | ||||||
|  |     match ::std::env::var(name) { | ||||||
|  |         Ok(var) => var == val, | ||||||
|  |         Err(_) => false, | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| fn serve_with_options(options: ServeOptions) -> Serve { | fn serve_with_options(options: ServeOptions) -> Serve { | ||||||
|     let _ = pretty_env_logger::init(); |     let _ = pretty_env_logger::init(); | ||||||
|  |  | ||||||
| @@ -657,12 +738,13 @@ fn serve_with_options(options: ServeOptions) -> Serve { | |||||||
|     let addr = "127.0.0.1:0".parse().unwrap(); |     let addr = "127.0.0.1:0".parse().unwrap(); | ||||||
|  |  | ||||||
|     let keep_alive = !options.keep_alive_disabled; |     let keep_alive = !options.keep_alive_disabled; | ||||||
|  |     let no_proto = !options.no_proto; | ||||||
|     let pipeline = options.pipeline; |     let pipeline = options.pipeline; | ||||||
|     let dur = options.timeout; |     let dur = options.timeout; | ||||||
|  |  | ||||||
|     let thread_name = format!("test-server-{:?}", dur); |     let thread_name = format!("test-server-{:?}", dur); | ||||||
|     let thread = thread::Builder::new().name(thread_name).spawn(move || { |     let thread = thread::Builder::new().name(thread_name).spawn(move || { | ||||||
|         let srv = Http::new() |         let mut srv = Http::new() | ||||||
|             .keep_alive(keep_alive) |             .keep_alive(keep_alive) | ||||||
|             .pipeline(pipeline) |             .pipeline(pipeline) | ||||||
|             .bind(&addr, TestService { |             .bind(&addr, TestService { | ||||||
| @@ -670,6 +752,9 @@ fn serve_with_options(options: ServeOptions) -> Serve { | |||||||
|                 _timeout: dur, |                 _timeout: dur, | ||||||
|                 reply: reply_rx, |                 reply: reply_rx, | ||||||
|             }).unwrap(); |             }).unwrap(); | ||||||
|  |         if no_proto { | ||||||
|  |             srv.no_proto(); | ||||||
|  |         } | ||||||
|         addr_tx.send(srv.local_addr().unwrap()).unwrap(); |         addr_tx.send(srv.local_addr().unwrap()).unwrap(); | ||||||
|         srv.run_until(shutdown_rx.then(|_| Ok(()))).unwrap(); |         srv.run_until(shutdown_rx.then(|_| Ok(()))).unwrap(); | ||||||
|     }).unwrap(); |     }).unwrap(); | ||||||
| @@ -684,5 +769,3 @@ fn serve_with_options(options: ServeOptions) -> Serve { | |||||||
|         thread: Some(thread), |         thread: Some(thread), | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user