perf(http): reduce memcpy calls using boxed pimpl
All of the move semantics remain the same for http::Conn while the self consumption and move semantics only require a pointer copy now rather than copying larger amounts of data. This greatly improves the performance of hyper, by my measurements about 125% faster when benchmarking using wrk.
This commit is contained in:
committed by
Sean McArthur
parent
dabe3ac0b1
commit
13a6a59d9d
261
src/http/conn.rs
261
src/http/conn.rs
@@ -24,7 +24,13 @@ const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
|
|||||||
/// The connection will determine when a message begins and ends, creating
|
/// The connection will determine when a message begins and ends, creating
|
||||||
/// a new message `MessageHandler` for each one, as well as determine if this
|
/// a new message `MessageHandler` for each one, as well as determine if this
|
||||||
/// connection can be kept alive after the message, or if it is complete.
|
/// connection can be kept alive after the message, or if it is complete.
|
||||||
pub struct Conn<K: Key, T: Transport, H: MessageHandler<T>> {
|
pub struct Conn<K: Key, T: Transport, H: MessageHandler<T>>(Box<ConnInner<K, T, H>>);
|
||||||
|
|
||||||
|
|
||||||
|
/// `ConnInner` contains all of a connections state which Conn proxies for in a way
|
||||||
|
/// that allows Conn to maintain convenient move and self consuming method call
|
||||||
|
/// semantics but avoiding many costly memcpy calls.
|
||||||
|
struct ConnInner<K: Key, T: Transport, H: MessageHandler<T>> {
|
||||||
buf: Buffer,
|
buf: Buffer,
|
||||||
ctrl: (channel::Sender<Next>, channel::Receiver<Next>),
|
ctrl: (channel::Sender<Next>, channel::Receiver<Next>),
|
||||||
keep_alive_enabled: bool,
|
keep_alive_enabled: bool,
|
||||||
@@ -33,7 +39,7 @@ pub struct Conn<K: Key, T: Transport, H: MessageHandler<T>> {
|
|||||||
transport: T,
|
transport: T,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<K: Key, T: Transport, H: MessageHandler<T>> fmt::Debug for Conn<K, T, H> {
|
impl<K: Key, T: Transport, H: MessageHandler<T>> fmt::Debug for ConnInner<K, T, H> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
f.debug_struct("Conn")
|
f.debug_struct("Conn")
|
||||||
.field("keep_alive_enabled", &self.keep_alive_enabled)
|
.field("keep_alive_enabled", &self.keep_alive_enabled)
|
||||||
@@ -43,23 +49,14 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> fmt::Debug for Conn<K, T, H> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
|
impl<K: Key, T: Transport, H: MessageHandler<T>> fmt::Debug for Conn<K, T, H> {
|
||||||
pub fn new(key: K, transport: T, notify: rotor::Notifier) -> Conn<K, T, H> {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
Conn {
|
self.0.fmt(f)
|
||||||
buf: Buffer::new(),
|
|
||||||
ctrl: channel::new(notify),
|
|
||||||
keep_alive_enabled: true,
|
|
||||||
key: key,
|
|
||||||
state: State::Init,
|
|
||||||
transport: transport,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn keep_alive(mut self, val: bool) -> Conn<K, T, H> {
|
|
||||||
self.keep_alive_enabled = val;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
|
impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
|
||||||
/// Desired Register interest based on state of current connection.
|
/// Desired Register interest based on state of current connection.
|
||||||
///
|
///
|
||||||
/// This includes the user interest, such as when they return `Next::read()`.
|
/// This includes the user interest, such as when they return `Next::read()`.
|
||||||
@@ -434,102 +431,6 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn ready<F>(mut self, events: EventSet, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
|
|
||||||
where F: MessageHandlerFactory<K, T, Output=H> {
|
|
||||||
trace!("Conn::ready events='{:?}', blocked={:?}", events, self.transport.blocked());
|
|
||||||
|
|
||||||
if events.is_error() {
|
|
||||||
match self.transport.take_socket_error() {
|
|
||||||
Ok(_) => {
|
|
||||||
trace!("is_error, but not socket error");
|
|
||||||
// spurious?
|
|
||||||
},
|
|
||||||
Err(e) => self.on_error(e.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the user had an io interest, but the transport was blocked differently,
|
|
||||||
// the event needs to be translated to what the user was actually expecting.
|
|
||||||
//
|
|
||||||
// Example:
|
|
||||||
// - User asks for `Next::write().
|
|
||||||
// - But transport is in the middle of renegotiating TLS, and is blocked on reading.
|
|
||||||
// - hyper should not wait on the `write` event, since epoll already
|
|
||||||
// knows it is writable. We would just loop a whole bunch, and slow down.
|
|
||||||
// - So instead, hyper waits on the event needed to unblock the transport, `read`.
|
|
||||||
// - Once epoll detects the transport is readable, it will alert hyper
|
|
||||||
// with a `readable` event.
|
|
||||||
// - hyper needs to translate that `readable` event back into a `write`,
|
|
||||||
// since that is actually what the Handler wants.
|
|
||||||
|
|
||||||
let events = if let Some(blocked) = self.transport.blocked() {
|
|
||||||
let interest = self.interest();
|
|
||||||
trace!("translating blocked={:?}, interest={:?}", blocked, interest);
|
|
||||||
match (blocked, interest) {
|
|
||||||
(Blocked::Read, Reg::Write) => EventSet::writable(),
|
|
||||||
(Blocked::Write, Reg::Read) => EventSet::readable(),
|
|
||||||
// otherwise, the transport was blocked on the same thing the user wanted
|
|
||||||
_ => events
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
events
|
|
||||||
};
|
|
||||||
|
|
||||||
if events.is_readable() {
|
|
||||||
self.on_readable(scope);
|
|
||||||
}
|
|
||||||
|
|
||||||
if events.is_writable() {
|
|
||||||
self.on_writable(scope);
|
|
||||||
}
|
|
||||||
|
|
||||||
let events = match self.register() {
|
|
||||||
Reg::Read => EventSet::readable(),
|
|
||||||
Reg::Write => EventSet::writable(),
|
|
||||||
Reg::ReadWrite => EventSet::readable() | EventSet::writable(),
|
|
||||||
Reg::Wait => EventSet::none(),
|
|
||||||
Reg::Remove => {
|
|
||||||
trace!("removing transport");
|
|
||||||
let _ = scope.deregister(&self.transport);
|
|
||||||
self.on_remove();
|
|
||||||
return None;
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
if events.is_readable() && self.can_read_more() {
|
|
||||||
return self.ready(events, scope);
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!("scope.reregister({:?})", events);
|
|
||||||
match scope.reregister(&self.transport, events, PollOpt::level()) {
|
|
||||||
Ok(..) => {
|
|
||||||
let timeout = self.state.timeout();
|
|
||||||
Some((self, timeout))
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
trace!("error reregistering: {:?}", e);
|
|
||||||
self.on_error(e.into());
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn wakeup<F>(mut self, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
|
|
||||||
where F: MessageHandlerFactory<K, T, Output=H> {
|
|
||||||
while let Ok(next) = self.ctrl.1.try_recv() {
|
|
||||||
trace!("woke up with {:?}", next);
|
|
||||||
self.state.update(next);
|
|
||||||
}
|
|
||||||
self.ready(EventSet::readable() | EventSet::writable(), scope)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn timeout<F>(mut self, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
|
|
||||||
where F: MessageHandlerFactory<K, T, Output=H> {
|
|
||||||
//TODO: check if this was a spurious timeout?
|
|
||||||
self.on_error(::Error::Timeout);
|
|
||||||
self.ready(EventSet::none(), scope)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_error(&mut self, err: ::Error) {
|
fn on_error(&mut self, err: ::Error) {
|
||||||
debug!("on_error err = {:?}", err);
|
debug!("on_error err = {:?}", err);
|
||||||
trace!("on_error state = {:?}", self.state);
|
trace!("on_error state = {:?}", self.state);
|
||||||
@@ -541,14 +442,6 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
|
|||||||
self.state.update(next);
|
self.state.update(next);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_remove(self) {
|
|
||||||
debug!("on_remove");
|
|
||||||
match self.state {
|
|
||||||
State::Init | State::Closed => (),
|
|
||||||
State::Http1(http1) => http1.handler.on_remove(self.transport),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_readable<F>(&mut self, scope: &mut Scope<F>)
|
fn on_readable<F>(&mut self, scope: &mut Scope<F>)
|
||||||
where F: MessageHandlerFactory<K, T, Output=H> {
|
where F: MessageHandlerFactory<K, T, Output=H> {
|
||||||
trace!("on_readable -> {:?}", self.state);
|
trace!("on_readable -> {:?}", self.state);
|
||||||
@@ -564,6 +457,134 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
|
|||||||
self.state = self.write(scope, state);
|
self.state = self.write(scope, state);
|
||||||
trace!("on_writable <- {:?}", self.state);
|
trace!("on_writable <- {:?}", self.state);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn on_remove(self) {
|
||||||
|
debug!("on_remove");
|
||||||
|
match self.state {
|
||||||
|
State::Init | State::Closed => (),
|
||||||
|
State::Http1(http1) => http1.handler.on_remove(self.transport),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
|
||||||
|
pub fn new(key: K, transport: T, notify: rotor::Notifier) -> Conn<K, T, H> {
|
||||||
|
Conn(Box::new(ConnInner {
|
||||||
|
buf: Buffer::new(),
|
||||||
|
ctrl: channel::new(notify),
|
||||||
|
keep_alive_enabled: true,
|
||||||
|
key: key,
|
||||||
|
state: State::Init,
|
||||||
|
transport: transport,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn keep_alive(mut self, val: bool) -> Conn<K, T, H> {
|
||||||
|
self.0.keep_alive_enabled = val;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn ready<F>(mut self, events: EventSet, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
|
||||||
|
where F: MessageHandlerFactory<K, T, Output=H> {
|
||||||
|
trace!("Conn::ready events='{:?}', blocked={:?}", events, self.0.transport.blocked());
|
||||||
|
|
||||||
|
if events.is_error() {
|
||||||
|
match self.0.transport.take_socket_error() {
|
||||||
|
Ok(_) => {
|
||||||
|
trace!("is_error, but not socket error");
|
||||||
|
// spurious?
|
||||||
|
},
|
||||||
|
Err(e) => self.0.on_error(e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the user had an io interest, but the transport was blocked differently,
|
||||||
|
// the event needs to be translated to what the user was actually expecting.
|
||||||
|
//
|
||||||
|
// Example:
|
||||||
|
// - User asks for `Next::write().
|
||||||
|
// - But transport is in the middle of renegotiating TLS, and is blocked on reading.
|
||||||
|
// - hyper should not wait on the `write` event, since epoll already
|
||||||
|
// knows it is writable. We would just loop a whole bunch, and slow down.
|
||||||
|
// - So instead, hyper waits on the event needed to unblock the transport, `read`.
|
||||||
|
// - Once epoll detects the transport is readable, it will alert hyper
|
||||||
|
// with a `readable` event.
|
||||||
|
// - hyper needs to translate that `readable` event back into a `write`,
|
||||||
|
// since that is actually what the Handler wants.
|
||||||
|
|
||||||
|
let events = if let Some(blocked) = self.0.transport.blocked() {
|
||||||
|
let interest = self.0.interest();
|
||||||
|
trace!("translating blocked={:?}, interest={:?}", blocked, interest);
|
||||||
|
match (blocked, interest) {
|
||||||
|
(Blocked::Read, Reg::Write) => EventSet::writable(),
|
||||||
|
(Blocked::Write, Reg::Read) => EventSet::readable(),
|
||||||
|
// otherwise, the transport was blocked on the same thing the user wanted
|
||||||
|
_ => events
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
events
|
||||||
|
};
|
||||||
|
|
||||||
|
if events.is_readable() {
|
||||||
|
self.0.on_readable(scope);
|
||||||
|
}
|
||||||
|
|
||||||
|
if events.is_writable() {
|
||||||
|
self.0.on_writable(scope);
|
||||||
|
}
|
||||||
|
|
||||||
|
let events = match self.0.register() {
|
||||||
|
Reg::Read => EventSet::readable(),
|
||||||
|
Reg::Write => EventSet::writable(),
|
||||||
|
Reg::ReadWrite => EventSet::readable() | EventSet::writable(),
|
||||||
|
Reg::Wait => EventSet::none(),
|
||||||
|
Reg::Remove => {
|
||||||
|
trace!("removing transport");
|
||||||
|
let _ = scope.deregister(&self.0.transport);
|
||||||
|
self.on_remove();
|
||||||
|
return None;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
if events.is_readable() && self.0.can_read_more() {
|
||||||
|
return self.ready(events, scope);
|
||||||
|
}
|
||||||
|
|
||||||
|
trace!("scope.reregister({:?})", events);
|
||||||
|
match scope.reregister(&self.0.transport, events, PollOpt::level()) {
|
||||||
|
Ok(..) => {
|
||||||
|
let timeout = self.0.state.timeout();
|
||||||
|
Some((self, timeout))
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
trace!("error reregistering: {:?}", e);
|
||||||
|
self.0.on_error(e.into());
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn wakeup<F>(mut self, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
|
||||||
|
where F: MessageHandlerFactory<K, T, Output=H> {
|
||||||
|
while let Ok(next) = self.0.ctrl.1.try_recv() {
|
||||||
|
trace!("woke up with {:?}", next);
|
||||||
|
self.0.state.update(next);
|
||||||
|
}
|
||||||
|
self.ready(EventSet::readable() | EventSet::writable(), scope)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn timeout<F>(mut self, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
|
||||||
|
where F: MessageHandlerFactory<K, T, Output=H> {
|
||||||
|
//TODO: check if this was a spurious timeout?
|
||||||
|
self.0.on_error(::Error::Timeout);
|
||||||
|
self.ready(EventSet::none(), scope)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_remove(self) {
|
||||||
|
self.0.on_remove()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
enum State<H: MessageHandler<T>, T: Transport> {
|
enum State<H: MessageHandler<T>, T: Transport> {
|
||||||
|
|||||||
Reference in New Issue
Block a user