From b60d4cda3d6145fd13589e31a4b035371bb3ba7e Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 9 Nov 2017 15:45:13 -0800 Subject: [PATCH] chore(server): setup ServerProto pieces to be deprecated - Adds a `server-proto` feature that is added to default features. - If `server-proto` feature is not enabled, pieces that will eventually be deprecated and optional will be tagged deprecated, but with a note about the missing `server-proto` feature. --- Cargo.toml | 3 +- src/lib.rs | 20 ++- src/proto/body.rs | 4 + src/server/mod.rs | 289 +++++-------------------------------- src/server/server_proto.rs | 265 ++++++++++++++++++++++++++++++++++ 5 files changed, 323 insertions(+), 258 deletions(-) create mode 100644 src/server/server_proto.rs diff --git a/Cargo.toml b/Cargo.toml index 176d6ca1..604b8118 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,8 @@ pretty_env_logger = "0.1" spmc = "0.2" [features] -default = [] +default = ["server-proto"] nightly = [] raw_status = [] compat = [ "http" ] +server-proto = [] diff --git a/src/lib.rs b/src/lib.rs index ab177bba..d21eb463 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,8 +39,6 @@ extern crate unicase; #[cfg(all(test, feature = "nightly"))] extern crate test; - -mod common; pub use uri::Uri; pub use client::Client; pub use error::{Result, Error}; @@ -55,6 +53,24 @@ pub use version::HttpVersion; #[cfg(feature = "raw_status")] pub use proto::RawStatus; +macro_rules! feat_server_proto { + ($($i:item)*) => ($( + #[cfg_attr( + not(feature = "server-proto"), + deprecated( + since="0.11.7", + note="server-proto was recently added to default features, but you have disabled default features. A future version will remove these types if the server-proto feature is not enabled." + ) + )] + #[cfg_attr( + not(feature = "server-proto"), + allow(deprecated) + )] + $i + )*) +} + +mod common; #[cfg(test)] mod mock; pub mod client; diff --git a/src/proto/body.rs b/src/proto/body.rs index cd7935f5..3f89fdcf 100644 --- a/src/proto/body.rs +++ b/src/proto/body.rs @@ -47,6 +47,8 @@ impl Stream for Body { } } +// deprecate soon, but can't really deprecate trait impls +#[doc(hidden)] impl From for tokio_proto::streaming::Body { #[inline] fn from(b: Body) -> tokio_proto::streaming::Body { @@ -54,6 +56,8 @@ impl From for tokio_proto::streaming::Body { } } +// deprecate soon, but can't really deprecate trait impls +#[doc(hidden)] impl From> for Body { #[inline] fn from(tokio_body: tokio_proto::streaming::Body) -> Body { diff --git a/src/server/mod.rs b/src/server/mod.rs index 57614dda..970f0a2e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -17,8 +17,8 @@ use std::rc::{Rc, Weak}; use std::time::Duration; use futures::task::{self, Task}; -use futures::future::{self, Map}; -use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink}; +use futures::future::{self}; +use futures::{Future, Stream, Poll, Async}; #[cfg(feature = "compat")] use http; @@ -26,14 +26,9 @@ use http; use tokio_io::{AsyncRead, AsyncWrite}; use tokio::reactor::{Core, Handle, Timeout}; use tokio::net::{TcpListener, TcpStream}; -use tokio_proto::BindServer; -use tokio_proto::streaming::Message; -use tokio_proto::streaming::pipeline::{Transport, Frame, ServerProto}; pub use tokio_service::{NewService, Service}; use proto; -use proto::response; -use proto::request; #[cfg(feature = "compat")] use proto::Body; use self::hyper_service::HyperService; @@ -41,6 +36,16 @@ use self::hyper_service::HyperService; pub use proto::response::Response; pub use proto::request::Request; +feat_server_proto! { + mod server_proto; + pub use self::server_proto::{ + __ProtoRequest, + __ProtoResponse, + __ProtoTransport, + __ProtoBindTransport, + }; +} + /// An instance of the HTTP protocol, and implementation of tokio-proto's /// `ServerProto` trait. /// @@ -92,7 +97,7 @@ pub struct SpawnAll { /// A stream of connections from binding to an address. #[must_use = "streams do nothing unless polled"] #[derive(Debug)] -pub struct AddrStream { +pub struct AddrIncoming { addr: SocketAddr, listener: TcpListener, } @@ -119,17 +124,24 @@ where // ===== impl Http ===== -impl + 'static> Http { - /// Creates a new instance of the HTTP protocol, ready to spawn a server or - /// start accepting connections. - pub fn new() -> Http { - Http { - keep_alive: true, - pipeline: false, - _marker: PhantomData, +// This is wrapped in this macro because using `Http` as a `ServerProto` will +// never trigger a deprecation warning, so we have to annoy more people to +// protect some others. +feat_server_proto! { + impl + 'static> Http { + /// Creates a new instance of the HTTP protocol, ready to spawn a server or + /// start accepting connections. + pub fn new() -> Http { + Http { + keep_alive: true, + pipeline: false, + _marker: PhantomData, + } } } +} +impl + 'static> Http { /// Enables or disables HTTP keep-alive. /// /// Default is true. @@ -189,55 +201,6 @@ impl + 'static> Http { self.bind(addr, self::compat_impl::new_service(new_service)) } - /// Use this `Http` instance to create a new server task which handles the - /// connection `io` provided. - /// - /// This is the low-level method used to actually spawn handling a TCP - /// connection, typically. The `handle` provided is the event loop on which - /// the server task will be spawned, `io` is the I/O object associated with - /// this connection (data that's read/written), `remote_addr` is the remote - /// peer address of the HTTP client, and `service` defines how HTTP requests - /// will be handled (and mapped to responses). - /// - /// This method is typically not invoked directly but is rather transitively - /// used through [`bind`](#method.bind). This can be useful, - /// however, when writing mocks or accepting sockets from a non-TCP - /// location. - pub fn bind_connection(&self, - handle: &Handle, - io: I, - remote_addr: SocketAddr, - service: S) - where S: Service, Error = ::Error> + 'static, - Bd: Stream + 'static, - I: AsyncRead + AsyncWrite + 'static, - { - self.bind_server(handle, io, HttpService { - inner: service, - remote_addr: remote_addr, - }) - } - - - /// Bind a `Service` using types from the `http` crate. - /// - /// See `Http::bind_connection`. - #[cfg(feature = "compat")] - pub fn bind_connection_compat(&self, - handle: &Handle, - io: I, - remote_addr: SocketAddr, - service: S) - where S: Service, Response = http::Response, Error = ::Error> + 'static, - Bd: Stream + 'static, - I: AsyncRead + AsyncWrite + 'static, - { - self.bind_server(handle, io, HttpService { - inner: compat_impl::service(service), - remote_addr: remote_addr, - }) - } - /// This method allows the ability to share a `Core` with multiple servers. /// /// Bind the provided `addr` and return a server with a shared `Core`. @@ -246,12 +209,12 @@ impl + 'static> Http { /// to accept connections. Each connection will be processed with the /// `new_service` object provided as well, creating a new service per /// connection. - pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> + pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> where S: NewService, Error = ::Error>, Bd: Stream, { let listener = TcpListener::bind(addr, &handle)?; - let incoming = AddrStream { + let incoming = AddrIncoming { addr: listener.local_addr()?, listener: listener, }; @@ -319,192 +282,7 @@ impl fmt::Debug for Http { } } -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct __ProtoRequest(proto::RequestHead); -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct __ProtoResponse(proto::MessageHead<::StatusCode>); -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct __ProtoTransport(proto::Conn); -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct __ProtoBindTransport { - inner: future::FutureResult, io::Error>, -} -impl ServerProto for Http - where T: AsyncRead + AsyncWrite + 'static, - B: AsRef<[u8]> + 'static, -{ - type Request = __ProtoRequest; - type RequestBody = proto::Chunk; - type Response = __ProtoResponse; - type ResponseBody = B; - type Error = ::Error; - type Transport = __ProtoTransport; - type BindTransport = __ProtoBindTransport; - - #[inline] - fn bind_transport(&self, io: T) -> Self::BindTransport { - let ka = if self.keep_alive { - proto::KA::Busy - } else { - proto::KA::Disabled - }; - let mut conn = proto::Conn::new(io, ka); - conn.set_flush_pipeline(self.pipeline); - __ProtoBindTransport { - inner: future::ok(conn), - } - } -} - -impl Sink for __ProtoTransport - where T: AsyncRead + AsyncWrite + 'static, - B: AsRef<[u8]> + 'static, -{ - type SinkItem = Frame<__ProtoResponse, B, ::Error>; - type SinkError = io::Error; - - #[inline] - fn start_send(&mut self, item: Self::SinkItem) - -> StartSend { - let item = match item { - Frame::Message { message, body } => { - Frame::Message { message: message.0, body: body } - } - Frame::Body { chunk } => Frame::Body { chunk: chunk }, - Frame::Error { error } => Frame::Error { error: error }, - }; - match try!(self.0.start_send(item)) { - AsyncSink::Ready => Ok(AsyncSink::Ready), - AsyncSink::NotReady(Frame::Message { message, body }) => { - Ok(AsyncSink::NotReady(Frame::Message { - message: __ProtoResponse(message), - body: body, - })) - } - AsyncSink::NotReady(Frame::Body { chunk }) => { - Ok(AsyncSink::NotReady(Frame::Body { chunk: chunk })) - } - AsyncSink::NotReady(Frame::Error { error }) => { - Ok(AsyncSink::NotReady(Frame::Error { error: error })) - } - } - } - - #[inline] - fn poll_complete(&mut self) -> Poll<(), io::Error> { - self.0.poll_complete() - } - - #[inline] - fn close(&mut self) -> Poll<(), io::Error> { - self.0.close() - } -} - -impl Stream for __ProtoTransport - where T: AsyncRead + AsyncWrite + 'static, - B: AsRef<[u8]> + 'static, -{ - type Item = Frame<__ProtoRequest, proto::Chunk, ::Error>; - type Error = io::Error; - - #[inline] - fn poll(&mut self) -> Poll, io::Error> { - let item = match try_ready!(self.0.poll()) { - Some(item) => item, - None => return Ok(None.into()), - }; - let item = match item { - Frame::Message { message, body } => { - Frame::Message { message: __ProtoRequest(message), body: body } - } - Frame::Body { chunk } => Frame::Body { chunk: chunk }, - Frame::Error { error } => Frame::Error { error: error }, - }; - Ok(Some(item).into()) - } -} - -impl Transport for __ProtoTransport - where T: AsyncRead + AsyncWrite + 'static, - B: AsRef<[u8]> + 'static, -{ - #[inline] - fn tick(&mut self) { - self.0.tick() - } - - #[inline] - fn cancel(&mut self) -> io::Result<()> { - self.0.cancel() - } -} - -impl Future for __ProtoBindTransport - where T: AsyncRead + AsyncWrite + 'static, -{ - type Item = __ProtoTransport; - type Error = io::Error; - - #[inline] - fn poll(&mut self) -> Poll<__ProtoTransport, io::Error> { - self.inner.poll().map(|a| a.map(__ProtoTransport)) - } -} - -impl From> for Request { - #[inline] - fn from(message: Message<__ProtoRequest, proto::TokioBody>) -> Request { - let (head, body) = match message { - Message::WithoutBody(head) => (head.0, proto::Body::empty()), - Message::WithBody(head, body) => (head.0, body.into()), - }; - request::from_wire(None, head, body) - } -} - -impl Into> for Response { - #[inline] - fn into(self) -> Message<__ProtoResponse, B> { - let (head, body) = response::split(self); - if let Some(body) = body { - Message::WithBody(__ProtoResponse(head), body.into()) - } else { - Message::WithoutBody(__ProtoResponse(head)) - } - } -} - -struct HttpService { - inner: T, - remote_addr: SocketAddr, -} - -impl Service for HttpService - where T: Service, Error=::Error>, - B: Stream, - B::Item: AsRef<[u8]>, -{ - type Request = Message<__ProtoRequest, proto::TokioBody>; - type Response = Message<__ProtoResponse, B>; - type Error = ::Error; - type Future = Map) -> Message<__ProtoResponse, B>>; - - #[inline] - fn call(&self, message: Self::Request) -> Self::Future { - let (head, body) = match message { - Message::WithoutBody(head) => (head.0, proto::Body::empty()), - Message::WithBody(head, body) => (head.0, body.into()), - }; - let req = request::from_wire(Some(self.remote_addr), head, body); - self.inner.call(req).map(Into::into) - } -} // ===== impl Server ===== @@ -590,6 +368,7 @@ impl Server .map_err(|err| error!("no_proto error: {}", err)); handle.spawn(fut); } else { + #[allow(deprecated)] protocol.bind_connection(&handle, socket, addr, s); } Ok(()) @@ -775,16 +554,16 @@ mod unnameable { } } -// ===== impl AddrStream ===== +// ===== impl AddrIncoming ===== -impl AddrStream { +impl AddrIncoming { /// Get the local address bound to this listener. pub fn local_addr(&self) -> SocketAddr { self.addr } } -impl Stream for AddrStream { +impl Stream for AddrIncoming { type Item = TcpStream; type Error = ::std::io::Error; diff --git a/src/server/server_proto.rs b/src/server/server_proto.rs new file mode 100644 index 00000000..c18c0513 --- /dev/null +++ b/src/server/server_proto.rs @@ -0,0 +1,265 @@ +//! The tokio-proto `ServerProto` machinery. +//! +//! Not to be confused with `hyper::proto`. +//! +//! Will be deprecated soon. + +use std::io; +use std::net::SocketAddr; + +#[cfg(feature = "compat")] +use http; +use futures::future::{self, Map}; +use futures::{Future, Stream, Poll, Sink, StartSend, AsyncSink}; +use tokio::reactor::Handle; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_proto::BindServer; +use tokio_proto::streaming::Message; +use tokio_proto::streaming::pipeline::{Transport, Frame, ServerProto}; +use tokio_service::Service; + +use {Request, Response}; +use proto::{self, request, response}; +#[cfg(feature = "compat")] +use proto::Body; +#[cfg(feature = "compat")] +use super::compat_impl; +use super::Http; + +impl + 'static> Http { + /// Use this `Http` instance to create a new server task which handles the + /// connection `io` provided. + /// + /// This is the low-level method used to actually spawn handling a TCP + /// connection, typically. The `handle` provided is the event loop on which + /// the server task will be spawned, `io` is the I/O object associated with + /// this connection (data that's read/written), `remote_addr` is the remote + /// peer address of the HTTP client, and `service` defines how HTTP requests + /// will be handled (and mapped to responses). + /// + /// This method is typically not invoked directly but is rather transitively + /// used through [`bind`](#method.bind). This can be useful, + /// however, when writing mocks or accepting sockets from a non-TCP + /// location. + pub fn bind_connection(&self, + handle: &Handle, + io: I, + remote_addr: SocketAddr, + service: S) + where S: Service, Error = ::Error> + 'static, + Bd: Stream + 'static, + I: AsyncRead + AsyncWrite + 'static, + { + self.bind_server(handle, io, HttpService { + inner: service, + remote_addr: remote_addr, + }) + } + + + /// Bind a `Service` using types from the `http` crate. + /// + /// See `Http::bind_connection`. + #[cfg(feature = "compat")] + pub fn bind_connection_compat(&self, + handle: &Handle, + io: I, + remote_addr: SocketAddr, + service: S) + where S: Service, Response = http::Response, Error = ::Error> + 'static, + Bd: Stream + 'static, + I: AsyncRead + AsyncWrite + 'static, + { + self.bind_server(handle, io, HttpService { + inner: compat_impl::service(service), + remote_addr: remote_addr, + }) + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct __ProtoRequest(proto::RequestHead); +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct __ProtoResponse(proto::MessageHead<::StatusCode>); +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct __ProtoTransport(proto::Conn); +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct __ProtoBindTransport { + inner: future::FutureResult, io::Error>, +} + +impl ServerProto for Http + where T: AsyncRead + AsyncWrite + 'static, + B: AsRef<[u8]> + 'static, +{ + type Request = __ProtoRequest; + type RequestBody = proto::Chunk; + type Response = __ProtoResponse; + type ResponseBody = B; + type Error = ::Error; + type Transport = __ProtoTransport; + type BindTransport = __ProtoBindTransport; + + #[inline] + fn bind_transport(&self, io: T) -> Self::BindTransport { + let ka = if self.keep_alive { + proto::KA::Busy + } else { + proto::KA::Disabled + }; + let mut conn = proto::Conn::new(io, ka); + conn.set_flush_pipeline(self.pipeline); + __ProtoBindTransport { + inner: future::ok(conn), + } + } +} + +impl Sink for __ProtoTransport + where T: AsyncRead + AsyncWrite + 'static, + B: AsRef<[u8]> + 'static, +{ + type SinkItem = Frame<__ProtoResponse, B, ::Error>; + type SinkError = io::Error; + + #[inline] + fn start_send(&mut self, item: Self::SinkItem) + -> StartSend { + let item = match item { + Frame::Message { message, body } => { + Frame::Message { message: message.0, body: body } + } + Frame::Body { chunk } => Frame::Body { chunk: chunk }, + Frame::Error { error } => Frame::Error { error: error }, + }; + match try!(self.0.start_send(item)) { + AsyncSink::Ready => Ok(AsyncSink::Ready), + AsyncSink::NotReady(Frame::Message { message, body }) => { + Ok(AsyncSink::NotReady(Frame::Message { + message: __ProtoResponse(message), + body: body, + })) + } + AsyncSink::NotReady(Frame::Body { chunk }) => { + Ok(AsyncSink::NotReady(Frame::Body { chunk: chunk })) + } + AsyncSink::NotReady(Frame::Error { error }) => { + Ok(AsyncSink::NotReady(Frame::Error { error: error })) + } + } + } + + #[inline] + fn poll_complete(&mut self) -> Poll<(), io::Error> { + self.0.poll_complete() + } + + #[inline] + fn close(&mut self) -> Poll<(), io::Error> { + self.0.close() + } +} + +impl Stream for __ProtoTransport + where T: AsyncRead + AsyncWrite + 'static, + B: AsRef<[u8]> + 'static, +{ + type Item = Frame<__ProtoRequest, proto::Chunk, ::Error>; + type Error = io::Error; + + #[inline] + fn poll(&mut self) -> Poll, io::Error> { + let item = match try_ready!(self.0.poll()) { + Some(item) => item, + None => return Ok(None.into()), + }; + let item = match item { + Frame::Message { message, body } => { + Frame::Message { message: __ProtoRequest(message), body: body } + } + Frame::Body { chunk } => Frame::Body { chunk: chunk }, + Frame::Error { error } => Frame::Error { error: error }, + }; + Ok(Some(item).into()) + } +} + +impl Transport for __ProtoTransport + where T: AsyncRead + AsyncWrite + 'static, + B: AsRef<[u8]> + 'static, +{ + #[inline] + fn tick(&mut self) { + self.0.tick() + } + + #[inline] + fn cancel(&mut self) -> io::Result<()> { + self.0.cancel() + } +} + +impl Future for __ProtoBindTransport + where T: AsyncRead + AsyncWrite + 'static, +{ + type Item = __ProtoTransport; + type Error = io::Error; + + #[inline] + fn poll(&mut self) -> Poll<__ProtoTransport, io::Error> { + self.inner.poll().map(|a| a.map(__ProtoTransport)) + } +} + +impl From> for Request { + #[inline] + fn from(message: Message<__ProtoRequest, proto::TokioBody>) -> Request { + let (head, body) = match message { + Message::WithoutBody(head) => (head.0, proto::Body::empty()), + Message::WithBody(head, body) => (head.0, body.into()), + }; + request::from_wire(None, head, body) + } +} + +impl Into> for Response { + #[inline] + fn into(self) -> Message<__ProtoResponse, B> { + let (head, body) = response::split(self); + if let Some(body) = body { + Message::WithBody(__ProtoResponse(head), body.into()) + } else { + Message::WithoutBody(__ProtoResponse(head)) + } + } +} + +struct HttpService { + inner: T, + remote_addr: SocketAddr, +} + +impl Service for HttpService + where T: Service, Error=::Error>, + B: Stream, + B::Item: AsRef<[u8]>, +{ + type Request = Message<__ProtoRequest, proto::TokioBody>; + type Response = Message<__ProtoResponse, B>; + type Error = ::Error; + type Future = Map) -> Message<__ProtoResponse, B>>; + + #[inline] + fn call(&self, message: Self::Request) -> Self::Future { + let (head, body) = match message { + Message::WithoutBody(head) => (head.0, proto::Body::empty()), + Message::WithBody(head, body) => (head.0, body.into()), + }; + let req = request::from_wire(Some(self.remote_addr), head, body); + self.inner.call(req).map(Into::into) + } +}