feat(server): add experimental pipeline flush aggregation option to Http
By enabling `Http::pipeline`, the connection will aggregate response writes to try to improve sending more responses in a single syscall.
This commit is contained in:
@@ -47,6 +47,9 @@ where I: AsyncRead + AsyncWrite,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_flush_pipeline(&mut self, enabled: bool) {
|
||||||
|
self.io.set_flush_pipeline(enabled);
|
||||||
|
}
|
||||||
|
|
||||||
fn poll2(&mut self) -> Poll<Option<Frame<http::MessageHead<T::Incoming>, http::Chunk, ::Error>>, io::Error> {
|
fn poll2(&mut self) -> Poll<Option<Frame<http::MessageHead<T::Incoming>, http::Chunk, ::Error>>, io::Error> {
|
||||||
trace!("Conn::poll()");
|
trace!("Conn::poll()");
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ const INIT_BUFFER_SIZE: usize = 8192;
|
|||||||
pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
|
pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
|
||||||
|
|
||||||
pub struct Buffered<T> {
|
pub struct Buffered<T> {
|
||||||
|
flush_pipeline: bool,
|
||||||
io: T,
|
io: T,
|
||||||
read_blocked: bool,
|
read_blocked: bool,
|
||||||
read_buf: BytesMut,
|
read_buf: BytesMut,
|
||||||
@@ -31,6 +32,7 @@ impl<T> fmt::Debug for Buffered<T> {
|
|||||||
impl<T: AsyncRead + AsyncWrite> Buffered<T> {
|
impl<T: AsyncRead + AsyncWrite> Buffered<T> {
|
||||||
pub fn new(io: T) -> Buffered<T> {
|
pub fn new(io: T) -> Buffered<T> {
|
||||||
Buffered {
|
Buffered {
|
||||||
|
flush_pipeline: false,
|
||||||
io: io,
|
io: io,
|
||||||
read_buf: BytesMut::with_capacity(0),
|
read_buf: BytesMut::with_capacity(0),
|
||||||
write_buf: WriteBuf::new(),
|
write_buf: WriteBuf::new(),
|
||||||
@@ -38,6 +40,10 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_flush_pipeline(&mut self, enabled: bool) {
|
||||||
|
self.flush_pipeline = enabled;
|
||||||
|
}
|
||||||
|
|
||||||
pub fn read_buf(&self) -> &[u8] {
|
pub fn read_buf(&self) -> &[u8] {
|
||||||
self.read_buf.as_ref()
|
self.read_buf.as_ref()
|
||||||
}
|
}
|
||||||
@@ -139,7 +145,9 @@ impl<T: Write> Write for Buffered<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
if self.write_buf.remaining() == 0 {
|
if self.flush_pipeline && self.read_buf.is_empty() {
|
||||||
|
Ok(())
|
||||||
|
} else if self.write_buf.remaining() == 0 {
|
||||||
self.io.flush()
|
self.io.flush()
|
||||||
} else {
|
} else {
|
||||||
loop {
|
loop {
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ pub use http::request::Request;
|
|||||||
/// configured with various protocol-level options such as keepalive.
|
/// configured with various protocol-level options such as keepalive.
|
||||||
pub struct Http<B = ::Chunk> {
|
pub struct Http<B = ::Chunk> {
|
||||||
keep_alive: bool,
|
keep_alive: bool,
|
||||||
|
pipeline: bool,
|
||||||
_marker: PhantomData<B>,
|
_marker: PhantomData<B>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -73,6 +74,7 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
|
|||||||
pub fn new() -> Http<B> {
|
pub fn new() -> Http<B> {
|
||||||
Http {
|
Http {
|
||||||
keep_alive: true,
|
keep_alive: true,
|
||||||
|
pipeline: false,
|
||||||
_marker: PhantomData,
|
_marker: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -85,6 +87,16 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Aggregates flushes to better support pipelined responses.
|
||||||
|
///
|
||||||
|
/// Experimental, may be have bugs.
|
||||||
|
///
|
||||||
|
/// Default is false.
|
||||||
|
pub fn pipeline(&mut self, enabled: bool) -> &mut Self {
|
||||||
|
self.pipeline = enabled;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Bind the provided `addr` and return a server ready to handle
|
/// Bind the provided `addr` and return a server ready to handle
|
||||||
/// connections.
|
/// connections.
|
||||||
///
|
///
|
||||||
@@ -185,6 +197,7 @@ impl<B> fmt::Debug for Http<B> {
|
|||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
f.debug_struct("Http")
|
f.debug_struct("Http")
|
||||||
.field("keep_alive", &self.keep_alive)
|
.field("keep_alive", &self.keep_alive)
|
||||||
|
.field("pipeline", &self.pipeline)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -223,8 +236,10 @@ impl<T, B> ServerProto<T> for Http<B>
|
|||||||
} else {
|
} else {
|
||||||
http::KA::Disabled
|
http::KA::Disabled
|
||||||
};
|
};
|
||||||
|
let mut conn = http::Conn::new(io, ka);
|
||||||
|
conn.set_flush_pipeline(self.pipeline);
|
||||||
__ProtoBindTransport {
|
__ProtoBindTransport {
|
||||||
inner: future::ok(http::Conn::new(io, ka)),
|
inner: future::ok(conn),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -437,6 +437,58 @@ fn expect_continue() {
|
|||||||
assert_eq!(body, msg);
|
assert_eq!(body, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn pipline_disabled() {
|
||||||
|
let server = serve();
|
||||||
|
let mut req = connect(server.addr());
|
||||||
|
server.reply().status(hyper::Ok);
|
||||||
|
server.reply().status(hyper::Ok);
|
||||||
|
|
||||||
|
req.write_all(b"\
|
||||||
|
GET / HTTP/1.1\r\n\
|
||||||
|
Host: example.domain\r\n\
|
||||||
|
\r\n\
|
||||||
|
GET / HTTP/1.1\r\n\
|
||||||
|
Host: example.domain\r\n\
|
||||||
|
\r\n\
|
||||||
|
").expect("write 1");
|
||||||
|
|
||||||
|
let mut buf = vec![0; 4096];
|
||||||
|
let n = req.read(&mut buf).expect("read 1");
|
||||||
|
assert_ne!(n, 0);
|
||||||
|
let n = req.read(&mut buf).expect("read 2");
|
||||||
|
assert_ne!(n, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn pipeline_enabled() {
|
||||||
|
let server = serve_with_options(ServeOptions {
|
||||||
|
pipeline: true,
|
||||||
|
.. Default::default()
|
||||||
|
});
|
||||||
|
let mut req = connect(server.addr());
|
||||||
|
server.reply().status(hyper::Ok);
|
||||||
|
server.reply().status(hyper::Ok);
|
||||||
|
|
||||||
|
req.write_all(b"\
|
||||||
|
GET / HTTP/1.1\r\n\
|
||||||
|
Host: example.domain\r\n\
|
||||||
|
\r\n\
|
||||||
|
GET / HTTP/1.1\r\n\
|
||||||
|
Host: example.domain\r\n\
|
||||||
|
Connection: close\r\n\
|
||||||
|
\r\n\
|
||||||
|
").expect("write 1");
|
||||||
|
|
||||||
|
let mut buf = vec![0; 4096];
|
||||||
|
let n = req.read(&mut buf).expect("read 1");
|
||||||
|
assert_ne!(n, 0);
|
||||||
|
// with pipeline enabled, both responses should have been in the first read
|
||||||
|
// so a second read should be EOF
|
||||||
|
let n = req.read(&mut buf).expect("read 2");
|
||||||
|
assert_eq!(n, 0);
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------
|
// -------------------------------------------------
|
||||||
// the Server that is used to run all the tests with
|
// the Server that is used to run all the tests with
|
||||||
// -------------------------------------------------
|
// -------------------------------------------------
|
||||||
@@ -577,6 +629,7 @@ fn serve() -> Serve {
|
|||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct ServeOptions {
|
struct ServeOptions {
|
||||||
keep_alive_disabled: bool,
|
keep_alive_disabled: bool,
|
||||||
|
pipeline: bool,
|
||||||
timeout: Option<Duration>,
|
timeout: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -591,15 +644,19 @@ fn serve_with_options(options: ServeOptions) -> Serve {
|
|||||||
let addr = "127.0.0.1:0".parse().unwrap();
|
let addr = "127.0.0.1:0".parse().unwrap();
|
||||||
|
|
||||||
let keep_alive = !options.keep_alive_disabled;
|
let keep_alive = !options.keep_alive_disabled;
|
||||||
|
let pipeline = options.pipeline;
|
||||||
let dur = options.timeout;
|
let dur = options.timeout;
|
||||||
|
|
||||||
let thread_name = format!("test-server-{:?}", dur);
|
let thread_name = format!("test-server-{:?}", dur);
|
||||||
let thread = thread::Builder::new().name(thread_name).spawn(move || {
|
let thread = thread::Builder::new().name(thread_name).spawn(move || {
|
||||||
let srv = Http::new().keep_alive(keep_alive).bind(&addr, TestService {
|
let srv = Http::new()
|
||||||
tx: Arc::new(Mutex::new(msg_tx.clone())),
|
.keep_alive(keep_alive)
|
||||||
_timeout: dur,
|
.pipeline(pipeline)
|
||||||
reply: reply_rx,
|
.bind(&addr, TestService {
|
||||||
}).unwrap();
|
tx: Arc::new(Mutex::new(msg_tx.clone())),
|
||||||
|
_timeout: dur,
|
||||||
|
reply: reply_rx,
|
||||||
|
}).unwrap();
|
||||||
addr_tx.send(srv.local_addr().unwrap()).unwrap();
|
addr_tx.send(srv.local_addr().unwrap()).unwrap();
|
||||||
srv.run_until(shutdown_rx.then(|_| Ok(()))).unwrap();
|
srv.run_until(shutdown_rx.then(|_| Ok(()))).unwrap();
|
||||||
}).unwrap();
|
}).unwrap();
|
||||||
|
|||||||
Reference in New Issue
Block a user