feat(client): add http1_writev configuration option
Setting this to false will force HTTP/1 connections to always flatten all buffers (headers and body) before writing to the transport. The default is true.
This commit is contained in:
@@ -42,6 +42,7 @@ pub mod compat;
|
|||||||
pub struct Client<C, B = proto::Body> {
|
pub struct Client<C, B = proto::Body> {
|
||||||
connector: C,
|
connector: C,
|
||||||
executor: Exec,
|
executor: Exec,
|
||||||
|
h1_writev: bool,
|
||||||
pool: Pool<HyperClient<B>>,
|
pool: Pool<HyperClient<B>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -96,6 +97,7 @@ impl<C, B> Client<C, B> {
|
|||||||
Client {
|
Client {
|
||||||
connector: config.connector,
|
connector: config.connector,
|
||||||
executor: exec,
|
executor: exec,
|
||||||
|
h1_writev: config.h1_writev,
|
||||||
pool: Pool::new(config.keep_alive, config.keep_alive_timeout)
|
pool: Pool::new(config.keep_alive, config.keep_alive_timeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -197,6 +199,7 @@ where C: Connect,
|
|||||||
let executor = self.executor.clone();
|
let executor = self.executor.clone();
|
||||||
let pool = self.pool.clone();
|
let pool = self.pool.clone();
|
||||||
let pool_key = Rc::new(domain.to_string());
|
let pool_key = Rc::new(domain.to_string());
|
||||||
|
let h1_writev = self.h1_writev;
|
||||||
self.connector.connect(url)
|
self.connector.connect(url)
|
||||||
.and_then(move |io| {
|
.and_then(move |io| {
|
||||||
let (tx, rx) = dispatch::channel();
|
let (tx, rx) = dispatch::channel();
|
||||||
@@ -205,7 +208,10 @@ where C: Connect,
|
|||||||
should_close: Cell::new(true),
|
should_close: Cell::new(true),
|
||||||
};
|
};
|
||||||
let pooled = pool.pooled(pool_key, tx);
|
let pooled = pool.pooled(pool_key, tx);
|
||||||
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
|
let mut conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
|
||||||
|
if !h1_writev {
|
||||||
|
conn.set_write_strategy_flatten();
|
||||||
|
}
|
||||||
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
|
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
|
||||||
executor.execute(dispatch.map_err(|e| debug!("client connection error: {}", e)))?;
|
executor.execute(dispatch.map_err(|e| debug!("client connection error: {}", e)))?;
|
||||||
Ok(pooled)
|
Ok(pooled)
|
||||||
@@ -256,6 +262,7 @@ impl<C: Clone, B> Clone for Client<C, B> {
|
|||||||
Client {
|
Client {
|
||||||
connector: self.connector.clone(),
|
connector: self.connector.clone(),
|
||||||
executor: self.executor.clone(),
|
executor: self.executor.clone(),
|
||||||
|
h1_writev: self.h1_writev,
|
||||||
pool: self.pool.clone(),
|
pool: self.pool.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -307,9 +314,9 @@ pub struct Config<C, B> {
|
|||||||
connector: C,
|
connector: C,
|
||||||
keep_alive: bool,
|
keep_alive: bool,
|
||||||
keep_alive_timeout: Option<Duration>,
|
keep_alive_timeout: Option<Duration>,
|
||||||
|
h1_writev: bool,
|
||||||
//TODO: make use of max_idle config
|
//TODO: make use of max_idle config
|
||||||
max_idle: usize,
|
max_idle: usize,
|
||||||
no_proto: bool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Phantom type used to signal that `Config` should create a `HttpConnector`.
|
/// Phantom type used to signal that `Config` should create a `HttpConnector`.
|
||||||
@@ -324,8 +331,8 @@ impl Default for Config<UseDefaultConnector, proto::Body> {
|
|||||||
connector: UseDefaultConnector(()),
|
connector: UseDefaultConnector(()),
|
||||||
keep_alive: true,
|
keep_alive: true,
|
||||||
keep_alive_timeout: Some(Duration::from_secs(90)),
|
keep_alive_timeout: Some(Duration::from_secs(90)),
|
||||||
|
h1_writev: true,
|
||||||
max_idle: 5,
|
max_idle: 5,
|
||||||
no_proto: false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -348,8 +355,8 @@ impl<C, B> Config<C, B> {
|
|||||||
connector: self.connector,
|
connector: self.connector,
|
||||||
keep_alive: self.keep_alive,
|
keep_alive: self.keep_alive,
|
||||||
keep_alive_timeout: self.keep_alive_timeout,
|
keep_alive_timeout: self.keep_alive_timeout,
|
||||||
|
h1_writev: self.h1_writev,
|
||||||
max_idle: self.max_idle,
|
max_idle: self.max_idle,
|
||||||
no_proto: self.no_proto,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -362,8 +369,8 @@ impl<C, B> Config<C, B> {
|
|||||||
connector: val,
|
connector: val,
|
||||||
keep_alive: self.keep_alive,
|
keep_alive: self.keep_alive,
|
||||||
keep_alive_timeout: self.keep_alive_timeout,
|
keep_alive_timeout: self.keep_alive_timeout,
|
||||||
|
h1_writev: self.h1_writev,
|
||||||
max_idle: self.max_idle,
|
max_idle: self.max_idle,
|
||||||
no_proto: self.no_proto,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -398,6 +405,20 @@ impl<C, B> Config<C, B> {
|
|||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/// Set whether HTTP/1 connections should try to use vectored writes,
|
||||||
|
/// or always flatten into a single buffer.
|
||||||
|
///
|
||||||
|
/// Note that setting this to false may mean more copies of body data,
|
||||||
|
/// but may also improve performance when an IO transport doesn't
|
||||||
|
/// support vectored writes well, such as most TLS implementations.
|
||||||
|
///
|
||||||
|
/// Default is true.
|
||||||
|
#[inline]
|
||||||
|
pub fn http1_writev(mut self, val: bool) -> Config<C, B> {
|
||||||
|
self.h1_writev = val;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
#[deprecated(since="0.11.11", note="no_proto is always enabled")]
|
#[deprecated(since="0.11.11", note="no_proto is always enabled")]
|
||||||
pub fn no_proto(self) -> Config<C, B> {
|
pub fn no_proto(self) -> Config<C, B> {
|
||||||
@@ -444,6 +465,7 @@ impl<C, B> fmt::Debug for Config<C, B> {
|
|||||||
f.debug_struct("Config")
|
f.debug_struct("Config")
|
||||||
.field("keep_alive", &self.keep_alive)
|
.field("keep_alive", &self.keep_alive)
|
||||||
.field("keep_alive_timeout", &self.keep_alive_timeout)
|
.field("keep_alive_timeout", &self.keep_alive_timeout)
|
||||||
|
.field("http1_writev", &self.h1_writev)
|
||||||
.field("max_idle", &self.max_idle)
|
.field("max_idle", &self.max_idle)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -62,6 +62,10 @@ where I: AsyncRead + AsyncWrite,
|
|||||||
self.io.set_max_buf_size(max);
|
self.io.set_max_buf_size(max);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_write_strategy_flatten(&mut self) {
|
||||||
|
self.io.set_write_strategy_flatten();
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "tokio-proto")]
|
#[cfg(feature = "tokio-proto")]
|
||||||
fn poll_incoming(&mut self) -> Poll<Option<Frame<MessageHead<T::Incoming>, Chunk, ::Error>>, io::Error> {
|
fn poll_incoming(&mut self) -> Poll<Option<Frame<MessageHead<T::Incoming>, Chunk, ::Error>>, io::Error> {
|
||||||
trace!("Conn::poll_incoming()");
|
trace!("Conn::poll_incoming()");
|
||||||
|
|||||||
@@ -65,6 +65,13 @@ where
|
|||||||
self.write_buf.max_buf_size = max;
|
self.write_buf.max_buf_size = max;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_write_strategy_flatten(&mut self) {
|
||||||
|
// this should always be called only at construction time,
|
||||||
|
// so this assert is here to catch myself
|
||||||
|
debug_assert!(self.write_buf.buf.bufs.is_empty());
|
||||||
|
self.write_buf.set_strategy(Strategy::Flatten);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn read_buf(&self) -> &[u8] {
|
pub fn read_buf(&self) -> &[u8] {
|
||||||
self.read_buf.as_ref()
|
self.read_buf.as_ref()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user