feat(server): change Incoming to iterator over Connections
A connection is returned from Incoming.next(), and can be passed to a separate thread before any parsing happens. Call conn.open() to get a Result<(Request, Response)>. BREAKING CHANGE
This commit is contained in:
@@ -23,7 +23,8 @@ macro_rules! try_continue(
|
|||||||
}})
|
}})
|
||||||
|
|
||||||
fn handle(mut incoming: Incoming) {
|
fn handle(mut incoming: Incoming) {
|
||||||
for (_, res) in incoming {
|
for conn in incoming {
|
||||||
|
let (_, res) = try_continue!(conn.open());
|
||||||
let mut res = try_continue!(res.start());
|
let mut res = try_continue!(res.start());
|
||||||
try_continue!(res.write(b"Benchmarking hyper vs others!"))
|
try_continue!(res.write(b"Benchmarking hyper vs others!"))
|
||||||
try_continue!(res.end());
|
try_continue!(res.end());
|
||||||
|
|||||||
@@ -18,7 +18,8 @@ fn request(url: hyper::Url) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn hyper_handle(mut incoming: hyper::server::Incoming) {
|
fn hyper_handle(mut incoming: hyper::server::Incoming) {
|
||||||
for (_, mut res) in incoming {
|
for conn in incoming {
|
||||||
|
let (_, res) = conn.open().unwrap();
|
||||||
let mut res = res.start().unwrap();
|
let mut res = res.start().unwrap();
|
||||||
res.write(PHRASE).unwrap();
|
res.write(PHRASE).unwrap();
|
||||||
res.end().unwrap();
|
res.end().unwrap();
|
||||||
|
|||||||
@@ -11,21 +11,6 @@ use hyper::server::{Server, Handler, Incoming, Request, Response};
|
|||||||
use hyper::header::common::ContentLength;
|
use hyper::header::common::ContentLength;
|
||||||
use hyper::net::{HttpStream, HttpAcceptor, Fresh};
|
use hyper::net::{HttpStream, HttpAcceptor, Fresh};
|
||||||
|
|
||||||
trait ConcurrentHandler: Send + Sync {
|
|
||||||
fn handle(&self, req: Request, res: Response<Fresh>);
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Concurrent<H: ConcurrentHandler> { handler: Arc<H> }
|
|
||||||
|
|
||||||
impl<H: ConcurrentHandler> Handler<HttpAcceptor, HttpStream> for Concurrent<H> {
|
|
||||||
fn handle(self, mut incoming: Incoming) {
|
|
||||||
for (mut req, mut res) in incoming {
|
|
||||||
let clone = self.handler.clone();
|
|
||||||
spawn(proc() { clone.handle(req, res) })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! try_abort(
|
macro_rules! try_abort(
|
||||||
($e:expr) => {{
|
($e:expr) => {{
|
||||||
match $e {
|
match $e {
|
||||||
@@ -35,6 +20,24 @@ macro_rules! try_abort(
|
|||||||
}}
|
}}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
trait ConcurrentHandler: Send + Sync {
|
||||||
|
fn handle(&self, req: Request, res: Response<Fresh>);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Concurrent<H: ConcurrentHandler> { handler: Arc<H> }
|
||||||
|
|
||||||
|
impl<H: ConcurrentHandler> Handler<HttpAcceptor, HttpStream> for Concurrent<H> {
|
||||||
|
fn handle(self, mut incoming: Incoming) {
|
||||||
|
for conn in incoming {
|
||||||
|
let clone = self.handler.clone();
|
||||||
|
spawn(proc() {
|
||||||
|
let (req, res) = try_abort!(conn.open());
|
||||||
|
clone.handle(req, res);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct Echo;
|
struct Echo;
|
||||||
|
|
||||||
impl ConcurrentHandler for Echo {
|
impl ConcurrentHandler for Echo {
|
||||||
|
|||||||
@@ -8,8 +8,9 @@ static PHRASE: &'static [u8] = b"Hello World!";
|
|||||||
fn hyper_handle(mut incoming: hyper::server::Incoming) {
|
fn hyper_handle(mut incoming: hyper::server::Incoming) {
|
||||||
let mut pool = TaskPool::new(100, || { proc(_) { } });
|
let mut pool = TaskPool::new(100, || { proc(_) { } });
|
||||||
|
|
||||||
for (_, mut res) in incoming {
|
for conn in incoming {
|
||||||
pool.execute(proc(_) {
|
pool.execute(proc(_) {
|
||||||
|
let (_, res) = conn.open().unwrap();
|
||||||
let mut res = res.start().unwrap();
|
let mut res = res.start().unwrap();
|
||||||
res.write(PHRASE).unwrap();
|
res.write(PHRASE).unwrap();
|
||||||
res.end().unwrap();
|
res.end().unwrap();
|
||||||
|
|||||||
@@ -19,7 +19,8 @@ macro_rules! try_continue(
|
|||||||
)
|
)
|
||||||
|
|
||||||
fn echo(mut incoming: Incoming) {
|
fn echo(mut incoming: Incoming) {
|
||||||
for (mut req, mut res) in incoming {
|
for conn in incoming {
|
||||||
|
let (mut req, mut res) = try_continue!(conn.open());
|
||||||
match req.uri {
|
match req.uri {
|
||||||
hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) {
|
hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) {
|
||||||
(&Get, "/") | (&Get, "/echo") => {
|
(&Get, "/") | (&Get, "/echo") => {
|
||||||
|
|||||||
@@ -89,29 +89,18 @@ impl<L: NetworkListener<S, A>, S: NetworkStream, A: NetworkAcceptor<S>> Server<L
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An iterator over incoming connections, represented as pairs of
|
/// An iterator over incoming `Connection`s.
|
||||||
/// hyper Requests and Responses.
|
|
||||||
pub struct Incoming<S: Send = HttpStream> {
|
pub struct Incoming<S: Send = HttpStream> {
|
||||||
from: Intertwined<IoResult<S>>
|
from: Intertwined<IoResult<S>>
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: NetworkStream + 'static> Iterator<(Request, Response<Fresh>)> for Incoming<S> {
|
impl<S: NetworkStream + 'static> Iterator<Connection<S>> for Incoming<S> {
|
||||||
fn next(&mut self) -> Option<(Request, Response<Fresh>)> {
|
fn next(&mut self) -> Option<Connection<S>> {
|
||||||
for conn in self.from {
|
for conn in self.from {
|
||||||
match conn {
|
match conn {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
debug!("Incoming stream");
|
debug!("Incoming stream");
|
||||||
let clone = stream.clone();
|
return Some(Connection(stream));
|
||||||
let req = match Request::new(stream) {
|
|
||||||
Ok(r) => r,
|
|
||||||
Err(err) => {
|
|
||||||
error!("creating Request: {}", err);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let mut res = Response::new(clone);
|
|
||||||
res.version = req.version;
|
|
||||||
return Some((req, res))
|
|
||||||
},
|
},
|
||||||
Err(ref e) if e.kind == EndOfFile => return None, // server closed
|
Err(ref e) if e.kind == EndOfFile => return None, // server closed
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -124,6 +113,21 @@ impl<S: NetworkStream + 'static> Iterator<(Request, Response<Fresh>)> for Incomi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An incoming connection. It can be opened to receive a request/response pair.
|
||||||
|
pub struct Connection<S: Send = HttpStream>(S);
|
||||||
|
|
||||||
|
impl<S: NetworkStream + 'static> Connection<S> {
|
||||||
|
/// Opens the incoming connection, parsing it into a Request/Response pair.
|
||||||
|
pub fn open(self) -> HttpResult<(Request, Response<Fresh>)> {
|
||||||
|
let stream = self.0;
|
||||||
|
let clone = stream.clone();
|
||||||
|
let req = try!(Request::new(stream));
|
||||||
|
let mut res = Response::new(clone);
|
||||||
|
res.version = req.version;
|
||||||
|
return Ok((req, res))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A listening server, which can later be closed.
|
/// A listening server, which can later be closed.
|
||||||
pub struct Listening<A = HttpAcceptor> {
|
pub struct Listening<A = HttpAcceptor> {
|
||||||
acceptors: Vec<A>,
|
acceptors: Vec<A>,
|
||||||
|
|||||||
Reference in New Issue
Block a user