chore(server): setup ServerProto pieces to be deprecated
- Adds a `server-proto` feature that is added to default features. - If `server-proto` feature is not enabled, pieces that will eventually be deprecated and optional will be tagged deprecated, but with a note about the missing `server-proto` feature.
This commit is contained in:
@@ -43,7 +43,8 @@ pretty_env_logger = "0.1"
|
|||||||
spmc = "0.2"
|
spmc = "0.2"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = ["server-proto"]
|
||||||
nightly = []
|
nightly = []
|
||||||
raw_status = []
|
raw_status = []
|
||||||
compat = [ "http" ]
|
compat = [ "http" ]
|
||||||
|
server-proto = []
|
||||||
|
|||||||
20
src/lib.rs
20
src/lib.rs
@@ -39,8 +39,6 @@ extern crate unicase;
|
|||||||
#[cfg(all(test, feature = "nightly"))]
|
#[cfg(all(test, feature = "nightly"))]
|
||||||
extern crate test;
|
extern crate test;
|
||||||
|
|
||||||
|
|
||||||
mod common;
|
|
||||||
pub use uri::Uri;
|
pub use uri::Uri;
|
||||||
pub use client::Client;
|
pub use client::Client;
|
||||||
pub use error::{Result, Error};
|
pub use error::{Result, Error};
|
||||||
@@ -55,6 +53,24 @@ pub use version::HttpVersion;
|
|||||||
#[cfg(feature = "raw_status")]
|
#[cfg(feature = "raw_status")]
|
||||||
pub use proto::RawStatus;
|
pub use proto::RawStatus;
|
||||||
|
|
||||||
|
macro_rules! feat_server_proto {
|
||||||
|
($($i:item)*) => ($(
|
||||||
|
#[cfg_attr(
|
||||||
|
not(feature = "server-proto"),
|
||||||
|
deprecated(
|
||||||
|
since="0.11.7",
|
||||||
|
note="server-proto was recently added to default features, but you have disabled default features. A future version will remove these types if the server-proto feature is not enabled."
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
#[cfg_attr(
|
||||||
|
not(feature = "server-proto"),
|
||||||
|
allow(deprecated)
|
||||||
|
)]
|
||||||
|
$i
|
||||||
|
)*)
|
||||||
|
}
|
||||||
|
|
||||||
|
mod common;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod mock;
|
mod mock;
|
||||||
pub mod client;
|
pub mod client;
|
||||||
|
|||||||
@@ -47,6 +47,8 @@ impl Stream for Body {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deprecate soon, but can't really deprecate trait impls
|
||||||
|
#[doc(hidden)]
|
||||||
impl From<Body> for tokio_proto::streaming::Body<Chunk, ::Error> {
|
impl From<Body> for tokio_proto::streaming::Body<Chunk, ::Error> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from(b: Body) -> tokio_proto::streaming::Body<Chunk, ::Error> {
|
fn from(b: Body) -> tokio_proto::streaming::Body<Chunk, ::Error> {
|
||||||
@@ -54,6 +56,8 @@ impl From<Body> for tokio_proto::streaming::Body<Chunk, ::Error> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deprecate soon, but can't really deprecate trait impls
|
||||||
|
#[doc(hidden)]
|
||||||
impl From<tokio_proto::streaming::Body<Chunk, ::Error>> for Body {
|
impl From<tokio_proto::streaming::Body<Chunk, ::Error>> for Body {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from(tokio_body: tokio_proto::streaming::Body<Chunk, ::Error>) -> Body {
|
fn from(tokio_body: tokio_proto::streaming::Body<Chunk, ::Error>) -> Body {
|
||||||
|
|||||||
@@ -17,8 +17,8 @@ use std::rc::{Rc, Weak};
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use futures::task::{self, Task};
|
use futures::task::{self, Task};
|
||||||
use futures::future::{self, Map};
|
use futures::future::{self};
|
||||||
use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
|
use futures::{Future, Stream, Poll, Async};
|
||||||
|
|
||||||
#[cfg(feature = "compat")]
|
#[cfg(feature = "compat")]
|
||||||
use http;
|
use http;
|
||||||
@@ -26,14 +26,9 @@ use http;
|
|||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
use tokio::reactor::{Core, Handle, Timeout};
|
use tokio::reactor::{Core, Handle, Timeout};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio_proto::BindServer;
|
|
||||||
use tokio_proto::streaming::Message;
|
|
||||||
use tokio_proto::streaming::pipeline::{Transport, Frame, ServerProto};
|
|
||||||
pub use tokio_service::{NewService, Service};
|
pub use tokio_service::{NewService, Service};
|
||||||
|
|
||||||
use proto;
|
use proto;
|
||||||
use proto::response;
|
|
||||||
use proto::request;
|
|
||||||
#[cfg(feature = "compat")]
|
#[cfg(feature = "compat")]
|
||||||
use proto::Body;
|
use proto::Body;
|
||||||
use self::hyper_service::HyperService;
|
use self::hyper_service::HyperService;
|
||||||
@@ -41,6 +36,16 @@ use self::hyper_service::HyperService;
|
|||||||
pub use proto::response::Response;
|
pub use proto::response::Response;
|
||||||
pub use proto::request::Request;
|
pub use proto::request::Request;
|
||||||
|
|
||||||
|
feat_server_proto! {
|
||||||
|
mod server_proto;
|
||||||
|
pub use self::server_proto::{
|
||||||
|
__ProtoRequest,
|
||||||
|
__ProtoResponse,
|
||||||
|
__ProtoTransport,
|
||||||
|
__ProtoBindTransport,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/// An instance of the HTTP protocol, and implementation of tokio-proto's
|
/// An instance of the HTTP protocol, and implementation of tokio-proto's
|
||||||
/// `ServerProto` trait.
|
/// `ServerProto` trait.
|
||||||
///
|
///
|
||||||
@@ -92,7 +97,7 @@ pub struct SpawnAll<I, S, E> {
|
|||||||
/// A stream of connections from binding to an address.
|
/// A stream of connections from binding to an address.
|
||||||
#[must_use = "streams do nothing unless polled"]
|
#[must_use = "streams do nothing unless polled"]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct AddrStream {
|
pub struct AddrIncoming {
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
listener: TcpListener,
|
listener: TcpListener,
|
||||||
}
|
}
|
||||||
@@ -119,6 +124,10 @@ where
|
|||||||
|
|
||||||
// ===== impl Http =====
|
// ===== impl Http =====
|
||||||
|
|
||||||
|
// This is wrapped in this macro because using `Http` as a `ServerProto` will
|
||||||
|
// never trigger a deprecation warning, so we have to annoy more people to
|
||||||
|
// protect some others.
|
||||||
|
feat_server_proto! {
|
||||||
impl<B: AsRef<[u8]> + 'static> Http<B> {
|
impl<B: AsRef<[u8]> + 'static> Http<B> {
|
||||||
/// Creates a new instance of the HTTP protocol, ready to spawn a server or
|
/// Creates a new instance of the HTTP protocol, ready to spawn a server or
|
||||||
/// start accepting connections.
|
/// start accepting connections.
|
||||||
@@ -129,7 +138,10 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
|
|||||||
_marker: PhantomData,
|
_marker: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: AsRef<[u8]> + 'static> Http<B> {
|
||||||
/// Enables or disables HTTP keep-alive.
|
/// Enables or disables HTTP keep-alive.
|
||||||
///
|
///
|
||||||
/// Default is true.
|
/// Default is true.
|
||||||
@@ -189,55 +201,6 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
|
|||||||
self.bind(addr, self::compat_impl::new_service(new_service))
|
self.bind(addr, self::compat_impl::new_service(new_service))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Use this `Http` instance to create a new server task which handles the
|
|
||||||
/// connection `io` provided.
|
|
||||||
///
|
|
||||||
/// This is the low-level method used to actually spawn handling a TCP
|
|
||||||
/// connection, typically. The `handle` provided is the event loop on which
|
|
||||||
/// the server task will be spawned, `io` is the I/O object associated with
|
|
||||||
/// this connection (data that's read/written), `remote_addr` is the remote
|
|
||||||
/// peer address of the HTTP client, and `service` defines how HTTP requests
|
|
||||||
/// will be handled (and mapped to responses).
|
|
||||||
///
|
|
||||||
/// This method is typically not invoked directly but is rather transitively
|
|
||||||
/// used through [`bind`](#method.bind). This can be useful,
|
|
||||||
/// however, when writing mocks or accepting sockets from a non-TCP
|
|
||||||
/// location.
|
|
||||||
pub fn bind_connection<S, I, Bd>(&self,
|
|
||||||
handle: &Handle,
|
|
||||||
io: I,
|
|
||||||
remote_addr: SocketAddr,
|
|
||||||
service: S)
|
|
||||||
where S: Service<Request = Request, Response = Response<Bd>, Error = ::Error> + 'static,
|
|
||||||
Bd: Stream<Item=B, Error=::Error> + 'static,
|
|
||||||
I: AsyncRead + AsyncWrite + 'static,
|
|
||||||
{
|
|
||||||
self.bind_server(handle, io, HttpService {
|
|
||||||
inner: service,
|
|
||||||
remote_addr: remote_addr,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// Bind a `Service` using types from the `http` crate.
|
|
||||||
///
|
|
||||||
/// See `Http::bind_connection`.
|
|
||||||
#[cfg(feature = "compat")]
|
|
||||||
pub fn bind_connection_compat<S, I, Bd>(&self,
|
|
||||||
handle: &Handle,
|
|
||||||
io: I,
|
|
||||||
remote_addr: SocketAddr,
|
|
||||||
service: S)
|
|
||||||
where S: Service<Request = http::Request<Body>, Response = http::Response<Bd>, Error = ::Error> + 'static,
|
|
||||||
Bd: Stream<Item=B, Error=::Error> + 'static,
|
|
||||||
I: AsyncRead + AsyncWrite + 'static,
|
|
||||||
{
|
|
||||||
self.bind_server(handle, io, HttpService {
|
|
||||||
inner: compat_impl::service(service),
|
|
||||||
remote_addr: remote_addr,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This method allows the ability to share a `Core` with multiple servers.
|
/// This method allows the ability to share a `Core` with multiple servers.
|
||||||
///
|
///
|
||||||
/// Bind the provided `addr` and return a server with a shared `Core`.
|
/// Bind the provided `addr` and return a server with a shared `Core`.
|
||||||
@@ -246,12 +209,12 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
|
|||||||
/// to accept connections. Each connection will be processed with the
|
/// to accept connections. Each connection will be processed with the
|
||||||
/// `new_service` object provided as well, creating a new service per
|
/// `new_service` object provided as well, creating a new service per
|
||||||
/// connection.
|
/// connection.
|
||||||
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrStream, S>>
|
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
|
||||||
where S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error>,
|
where S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error>,
|
||||||
Bd: Stream<Item=B, Error=::Error>,
|
Bd: Stream<Item=B, Error=::Error>,
|
||||||
{
|
{
|
||||||
let listener = TcpListener::bind(addr, &handle)?;
|
let listener = TcpListener::bind(addr, &handle)?;
|
||||||
let incoming = AddrStream {
|
let incoming = AddrIncoming {
|
||||||
addr: listener.local_addr()?,
|
addr: listener.local_addr()?,
|
||||||
listener: listener,
|
listener: listener,
|
||||||
};
|
};
|
||||||
@@ -319,192 +282,7 @@ impl<B> fmt::Debug for Http<B> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
pub struct __ProtoRequest(proto::RequestHead);
|
|
||||||
#[doc(hidden)]
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
pub struct __ProtoResponse(proto::MessageHead<::StatusCode>);
|
|
||||||
#[doc(hidden)]
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
pub struct __ProtoTransport<T, B>(proto::Conn<T, B, proto::ServerTransaction>);
|
|
||||||
#[doc(hidden)]
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
pub struct __ProtoBindTransport<T, B> {
|
|
||||||
inner: future::FutureResult<proto::Conn<T, B, proto::ServerTransaction>, io::Error>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, B> ServerProto<T> for Http<B>
|
|
||||||
where T: AsyncRead + AsyncWrite + 'static,
|
|
||||||
B: AsRef<[u8]> + 'static,
|
|
||||||
{
|
|
||||||
type Request = __ProtoRequest;
|
|
||||||
type RequestBody = proto::Chunk;
|
|
||||||
type Response = __ProtoResponse;
|
|
||||||
type ResponseBody = B;
|
|
||||||
type Error = ::Error;
|
|
||||||
type Transport = __ProtoTransport<T, B>;
|
|
||||||
type BindTransport = __ProtoBindTransport<T, B>;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn bind_transport(&self, io: T) -> Self::BindTransport {
|
|
||||||
let ka = if self.keep_alive {
|
|
||||||
proto::KA::Busy
|
|
||||||
} else {
|
|
||||||
proto::KA::Disabled
|
|
||||||
};
|
|
||||||
let mut conn = proto::Conn::new(io, ka);
|
|
||||||
conn.set_flush_pipeline(self.pipeline);
|
|
||||||
__ProtoBindTransport {
|
|
||||||
inner: future::ok(conn),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, B> Sink for __ProtoTransport<T, B>
|
|
||||||
where T: AsyncRead + AsyncWrite + 'static,
|
|
||||||
B: AsRef<[u8]> + 'static,
|
|
||||||
{
|
|
||||||
type SinkItem = Frame<__ProtoResponse, B, ::Error>;
|
|
||||||
type SinkError = io::Error;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn start_send(&mut self, item: Self::SinkItem)
|
|
||||||
-> StartSend<Self::SinkItem, io::Error> {
|
|
||||||
let item = match item {
|
|
||||||
Frame::Message { message, body } => {
|
|
||||||
Frame::Message { message: message.0, body: body }
|
|
||||||
}
|
|
||||||
Frame::Body { chunk } => Frame::Body { chunk: chunk },
|
|
||||||
Frame::Error { error } => Frame::Error { error: error },
|
|
||||||
};
|
|
||||||
match try!(self.0.start_send(item)) {
|
|
||||||
AsyncSink::Ready => Ok(AsyncSink::Ready),
|
|
||||||
AsyncSink::NotReady(Frame::Message { message, body }) => {
|
|
||||||
Ok(AsyncSink::NotReady(Frame::Message {
|
|
||||||
message: __ProtoResponse(message),
|
|
||||||
body: body,
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
AsyncSink::NotReady(Frame::Body { chunk }) => {
|
|
||||||
Ok(AsyncSink::NotReady(Frame::Body { chunk: chunk }))
|
|
||||||
}
|
|
||||||
AsyncSink::NotReady(Frame::Error { error }) => {
|
|
||||||
Ok(AsyncSink::NotReady(Frame::Error { error: error }))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn poll_complete(&mut self) -> Poll<(), io::Error> {
|
|
||||||
self.0.poll_complete()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn close(&mut self) -> Poll<(), io::Error> {
|
|
||||||
self.0.close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, B> Stream for __ProtoTransport<T, B>
|
|
||||||
where T: AsyncRead + AsyncWrite + 'static,
|
|
||||||
B: AsRef<[u8]> + 'static,
|
|
||||||
{
|
|
||||||
type Item = Frame<__ProtoRequest, proto::Chunk, ::Error>;
|
|
||||||
type Error = io::Error;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
|
|
||||||
let item = match try_ready!(self.0.poll()) {
|
|
||||||
Some(item) => item,
|
|
||||||
None => return Ok(None.into()),
|
|
||||||
};
|
|
||||||
let item = match item {
|
|
||||||
Frame::Message { message, body } => {
|
|
||||||
Frame::Message { message: __ProtoRequest(message), body: body }
|
|
||||||
}
|
|
||||||
Frame::Body { chunk } => Frame::Body { chunk: chunk },
|
|
||||||
Frame::Error { error } => Frame::Error { error: error },
|
|
||||||
};
|
|
||||||
Ok(Some(item).into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, B> Transport for __ProtoTransport<T, B>
|
|
||||||
where T: AsyncRead + AsyncWrite + 'static,
|
|
||||||
B: AsRef<[u8]> + 'static,
|
|
||||||
{
|
|
||||||
#[inline]
|
|
||||||
fn tick(&mut self) {
|
|
||||||
self.0.tick()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn cancel(&mut self) -> io::Result<()> {
|
|
||||||
self.0.cancel()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, B> Future for __ProtoBindTransport<T, B>
|
|
||||||
where T: AsyncRead + AsyncWrite + 'static,
|
|
||||||
{
|
|
||||||
type Item = __ProtoTransport<T, B>;
|
|
||||||
type Error = io::Error;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn poll(&mut self) -> Poll<__ProtoTransport<T, B>, io::Error> {
|
|
||||||
self.inner.poll().map(|a| a.map(__ProtoTransport))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<Message<__ProtoRequest, proto::TokioBody>> for Request {
|
|
||||||
#[inline]
|
|
||||||
fn from(message: Message<__ProtoRequest, proto::TokioBody>) -> Request {
|
|
||||||
let (head, body) = match message {
|
|
||||||
Message::WithoutBody(head) => (head.0, proto::Body::empty()),
|
|
||||||
Message::WithBody(head, body) => (head.0, body.into()),
|
|
||||||
};
|
|
||||||
request::from_wire(None, head, body)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<B> Into<Message<__ProtoResponse, B>> for Response<B> {
|
|
||||||
#[inline]
|
|
||||||
fn into(self) -> Message<__ProtoResponse, B> {
|
|
||||||
let (head, body) = response::split(self);
|
|
||||||
if let Some(body) = body {
|
|
||||||
Message::WithBody(__ProtoResponse(head), body.into())
|
|
||||||
} else {
|
|
||||||
Message::WithoutBody(__ProtoResponse(head))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct HttpService<T> {
|
|
||||||
inner: T,
|
|
||||||
remote_addr: SocketAddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, B> Service for HttpService<T>
|
|
||||||
where T: Service<Request=Request, Response=Response<B>, Error=::Error>,
|
|
||||||
B: Stream<Error=::Error>,
|
|
||||||
B::Item: AsRef<[u8]>,
|
|
||||||
{
|
|
||||||
type Request = Message<__ProtoRequest, proto::TokioBody>;
|
|
||||||
type Response = Message<__ProtoResponse, B>;
|
|
||||||
type Error = ::Error;
|
|
||||||
type Future = Map<T::Future, fn(Response<B>) -> Message<__ProtoResponse, B>>;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn call(&self, message: Self::Request) -> Self::Future {
|
|
||||||
let (head, body) = match message {
|
|
||||||
Message::WithoutBody(head) => (head.0, proto::Body::empty()),
|
|
||||||
Message::WithBody(head, body) => (head.0, body.into()),
|
|
||||||
};
|
|
||||||
let req = request::from_wire(Some(self.remote_addr), head, body);
|
|
||||||
self.inner.call(req).map(Into::into)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===== impl Server =====
|
// ===== impl Server =====
|
||||||
|
|
||||||
@@ -590,6 +368,7 @@ impl<S, B> Server<S, B>
|
|||||||
.map_err(|err| error!("no_proto error: {}", err));
|
.map_err(|err| error!("no_proto error: {}", err));
|
||||||
handle.spawn(fut);
|
handle.spawn(fut);
|
||||||
} else {
|
} else {
|
||||||
|
#[allow(deprecated)]
|
||||||
protocol.bind_connection(&handle, socket, addr, s);
|
protocol.bind_connection(&handle, socket, addr, s);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -775,16 +554,16 @@ mod unnameable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl AddrStream =====
|
// ===== impl AddrIncoming =====
|
||||||
|
|
||||||
impl AddrStream {
|
impl AddrIncoming {
|
||||||
/// Get the local address bound to this listener.
|
/// Get the local address bound to this listener.
|
||||||
pub fn local_addr(&self) -> SocketAddr {
|
pub fn local_addr(&self) -> SocketAddr {
|
||||||
self.addr
|
self.addr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream for AddrStream {
|
impl Stream for AddrIncoming {
|
||||||
type Item = TcpStream;
|
type Item = TcpStream;
|
||||||
type Error = ::std::io::Error;
|
type Error = ::std::io::Error;
|
||||||
|
|
||||||
|
|||||||
265
src/server/server_proto.rs
Normal file
265
src/server/server_proto.rs
Normal file
@@ -0,0 +1,265 @@
|
|||||||
|
//! The tokio-proto `ServerProto` machinery.
|
||||||
|
//!
|
||||||
|
//! Not to be confused with `hyper::proto`.
|
||||||
|
//!
|
||||||
|
//! Will be deprecated soon.
|
||||||
|
|
||||||
|
use std::io;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
#[cfg(feature = "compat")]
|
||||||
|
use http;
|
||||||
|
use futures::future::{self, Map};
|
||||||
|
use futures::{Future, Stream, Poll, Sink, StartSend, AsyncSink};
|
||||||
|
use tokio::reactor::Handle;
|
||||||
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
use tokio_proto::BindServer;
|
||||||
|
use tokio_proto::streaming::Message;
|
||||||
|
use tokio_proto::streaming::pipeline::{Transport, Frame, ServerProto};
|
||||||
|
use tokio_service::Service;
|
||||||
|
|
||||||
|
use {Request, Response};
|
||||||
|
use proto::{self, request, response};
|
||||||
|
#[cfg(feature = "compat")]
|
||||||
|
use proto::Body;
|
||||||
|
#[cfg(feature = "compat")]
|
||||||
|
use super::compat_impl;
|
||||||
|
use super::Http;
|
||||||
|
|
||||||
|
impl<B: AsRef<[u8]> + 'static> Http<B> {
|
||||||
|
/// Use this `Http` instance to create a new server task which handles the
|
||||||
|
/// connection `io` provided.
|
||||||
|
///
|
||||||
|
/// This is the low-level method used to actually spawn handling a TCP
|
||||||
|
/// connection, typically. The `handle` provided is the event loop on which
|
||||||
|
/// the server task will be spawned, `io` is the I/O object associated with
|
||||||
|
/// this connection (data that's read/written), `remote_addr` is the remote
|
||||||
|
/// peer address of the HTTP client, and `service` defines how HTTP requests
|
||||||
|
/// will be handled (and mapped to responses).
|
||||||
|
///
|
||||||
|
/// This method is typically not invoked directly but is rather transitively
|
||||||
|
/// used through [`bind`](#method.bind). This can be useful,
|
||||||
|
/// however, when writing mocks or accepting sockets from a non-TCP
|
||||||
|
/// location.
|
||||||
|
pub fn bind_connection<S, I, Bd>(&self,
|
||||||
|
handle: &Handle,
|
||||||
|
io: I,
|
||||||
|
remote_addr: SocketAddr,
|
||||||
|
service: S)
|
||||||
|
where S: Service<Request = Request, Response = Response<Bd>, Error = ::Error> + 'static,
|
||||||
|
Bd: Stream<Item=B, Error=::Error> + 'static,
|
||||||
|
I: AsyncRead + AsyncWrite + 'static,
|
||||||
|
{
|
||||||
|
self.bind_server(handle, io, HttpService {
|
||||||
|
inner: service,
|
||||||
|
remote_addr: remote_addr,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Bind a `Service` using types from the `http` crate.
|
||||||
|
///
|
||||||
|
/// See `Http::bind_connection`.
|
||||||
|
#[cfg(feature = "compat")]
|
||||||
|
pub fn bind_connection_compat<S, I, Bd>(&self,
|
||||||
|
handle: &Handle,
|
||||||
|
io: I,
|
||||||
|
remote_addr: SocketAddr,
|
||||||
|
service: S)
|
||||||
|
where S: Service<Request = http::Request<Body>, Response = http::Response<Bd>, Error = ::Error> + 'static,
|
||||||
|
Bd: Stream<Item=B, Error=::Error> + 'static,
|
||||||
|
I: AsyncRead + AsyncWrite + 'static,
|
||||||
|
{
|
||||||
|
self.bind_server(handle, io, HttpService {
|
||||||
|
inner: compat_impl::service(service),
|
||||||
|
remote_addr: remote_addr,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct __ProtoRequest(proto::RequestHead);
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct __ProtoResponse(proto::MessageHead<::StatusCode>);
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct __ProtoTransport<T, B>(proto::Conn<T, B, proto::ServerTransaction>);
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct __ProtoBindTransport<T, B> {
|
||||||
|
inner: future::FutureResult<proto::Conn<T, B, proto::ServerTransaction>, io::Error>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, B> ServerProto<T> for Http<B>
|
||||||
|
where T: AsyncRead + AsyncWrite + 'static,
|
||||||
|
B: AsRef<[u8]> + 'static,
|
||||||
|
{
|
||||||
|
type Request = __ProtoRequest;
|
||||||
|
type RequestBody = proto::Chunk;
|
||||||
|
type Response = __ProtoResponse;
|
||||||
|
type ResponseBody = B;
|
||||||
|
type Error = ::Error;
|
||||||
|
type Transport = __ProtoTransport<T, B>;
|
||||||
|
type BindTransport = __ProtoBindTransport<T, B>;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn bind_transport(&self, io: T) -> Self::BindTransport {
|
||||||
|
let ka = if self.keep_alive {
|
||||||
|
proto::KA::Busy
|
||||||
|
} else {
|
||||||
|
proto::KA::Disabled
|
||||||
|
};
|
||||||
|
let mut conn = proto::Conn::new(io, ka);
|
||||||
|
conn.set_flush_pipeline(self.pipeline);
|
||||||
|
__ProtoBindTransport {
|
||||||
|
inner: future::ok(conn),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, B> Sink for __ProtoTransport<T, B>
|
||||||
|
where T: AsyncRead + AsyncWrite + 'static,
|
||||||
|
B: AsRef<[u8]> + 'static,
|
||||||
|
{
|
||||||
|
type SinkItem = Frame<__ProtoResponse, B, ::Error>;
|
||||||
|
type SinkError = io::Error;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn start_send(&mut self, item: Self::SinkItem)
|
||||||
|
-> StartSend<Self::SinkItem, io::Error> {
|
||||||
|
let item = match item {
|
||||||
|
Frame::Message { message, body } => {
|
||||||
|
Frame::Message { message: message.0, body: body }
|
||||||
|
}
|
||||||
|
Frame::Body { chunk } => Frame::Body { chunk: chunk },
|
||||||
|
Frame::Error { error } => Frame::Error { error: error },
|
||||||
|
};
|
||||||
|
match try!(self.0.start_send(item)) {
|
||||||
|
AsyncSink::Ready => Ok(AsyncSink::Ready),
|
||||||
|
AsyncSink::NotReady(Frame::Message { message, body }) => {
|
||||||
|
Ok(AsyncSink::NotReady(Frame::Message {
|
||||||
|
message: __ProtoResponse(message),
|
||||||
|
body: body,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
AsyncSink::NotReady(Frame::Body { chunk }) => {
|
||||||
|
Ok(AsyncSink::NotReady(Frame::Body { chunk: chunk }))
|
||||||
|
}
|
||||||
|
AsyncSink::NotReady(Frame::Error { error }) => {
|
||||||
|
Ok(AsyncSink::NotReady(Frame::Error { error: error }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn poll_complete(&mut self) -> Poll<(), io::Error> {
|
||||||
|
self.0.poll_complete()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn close(&mut self) -> Poll<(), io::Error> {
|
||||||
|
self.0.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, B> Stream for __ProtoTransport<T, B>
|
||||||
|
where T: AsyncRead + AsyncWrite + 'static,
|
||||||
|
B: AsRef<[u8]> + 'static,
|
||||||
|
{
|
||||||
|
type Item = Frame<__ProtoRequest, proto::Chunk, ::Error>;
|
||||||
|
type Error = io::Error;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
|
||||||
|
let item = match try_ready!(self.0.poll()) {
|
||||||
|
Some(item) => item,
|
||||||
|
None => return Ok(None.into()),
|
||||||
|
};
|
||||||
|
let item = match item {
|
||||||
|
Frame::Message { message, body } => {
|
||||||
|
Frame::Message { message: __ProtoRequest(message), body: body }
|
||||||
|
}
|
||||||
|
Frame::Body { chunk } => Frame::Body { chunk: chunk },
|
||||||
|
Frame::Error { error } => Frame::Error { error: error },
|
||||||
|
};
|
||||||
|
Ok(Some(item).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, B> Transport for __ProtoTransport<T, B>
|
||||||
|
where T: AsyncRead + AsyncWrite + 'static,
|
||||||
|
B: AsRef<[u8]> + 'static,
|
||||||
|
{
|
||||||
|
#[inline]
|
||||||
|
fn tick(&mut self) {
|
||||||
|
self.0.tick()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn cancel(&mut self) -> io::Result<()> {
|
||||||
|
self.0.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, B> Future for __ProtoBindTransport<T, B>
|
||||||
|
where T: AsyncRead + AsyncWrite + 'static,
|
||||||
|
{
|
||||||
|
type Item = __ProtoTransport<T, B>;
|
||||||
|
type Error = io::Error;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn poll(&mut self) -> Poll<__ProtoTransport<T, B>, io::Error> {
|
||||||
|
self.inner.poll().map(|a| a.map(__ProtoTransport))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Message<__ProtoRequest, proto::TokioBody>> for Request {
|
||||||
|
#[inline]
|
||||||
|
fn from(message: Message<__ProtoRequest, proto::TokioBody>) -> Request {
|
||||||
|
let (head, body) = match message {
|
||||||
|
Message::WithoutBody(head) => (head.0, proto::Body::empty()),
|
||||||
|
Message::WithBody(head, body) => (head.0, body.into()),
|
||||||
|
};
|
||||||
|
request::from_wire(None, head, body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> Into<Message<__ProtoResponse, B>> for Response<B> {
|
||||||
|
#[inline]
|
||||||
|
fn into(self) -> Message<__ProtoResponse, B> {
|
||||||
|
let (head, body) = response::split(self);
|
||||||
|
if let Some(body) = body {
|
||||||
|
Message::WithBody(__ProtoResponse(head), body.into())
|
||||||
|
} else {
|
||||||
|
Message::WithoutBody(__ProtoResponse(head))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct HttpService<T> {
|
||||||
|
inner: T,
|
||||||
|
remote_addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, B> Service for HttpService<T>
|
||||||
|
where T: Service<Request=Request, Response=Response<B>, Error=::Error>,
|
||||||
|
B: Stream<Error=::Error>,
|
||||||
|
B::Item: AsRef<[u8]>,
|
||||||
|
{
|
||||||
|
type Request = Message<__ProtoRequest, proto::TokioBody>;
|
||||||
|
type Response = Message<__ProtoResponse, B>;
|
||||||
|
type Error = ::Error;
|
||||||
|
type Future = Map<T::Future, fn(Response<B>) -> Message<__ProtoResponse, B>>;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn call(&self, message: Self::Request) -> Self::Future {
|
||||||
|
let (head, body) = match message {
|
||||||
|
Message::WithoutBody(head) => (head.0, proto::Body::empty()),
|
||||||
|
Message::WithBody(head, body) => (head.0, body.into()),
|
||||||
|
};
|
||||||
|
let req = request::from_wire(Some(self.remote_addr), head, body);
|
||||||
|
self.inner.call(req).map(Into::into)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user