Merge pull request #829 from hyperium/822
perf(http): reduce memcpy calls using boxed pimpl
This commit is contained in:
259
src/http/conn.rs
259
src/http/conn.rs
@@ -24,7 +24,13 @@ const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
|
||||
/// The connection will determine when a message begins and ends, creating
|
||||
/// a new message `MessageHandler` for each one, as well as determine if this
|
||||
/// connection can be kept alive after the message, or if it is complete.
|
||||
pub struct Conn<K: Key, T: Transport, H: MessageHandler<T>> {
|
||||
pub struct Conn<K: Key, T: Transport, H: MessageHandler<T>>(Box<ConnInner<K, T, H>>);
|
||||
|
||||
|
||||
/// `ConnInner` contains all of a connections state which Conn proxies for in a way
|
||||
/// that allows Conn to maintain convenient move and self consuming method call
|
||||
/// semantics but avoiding many costly memcpy calls.
|
||||
struct ConnInner<K: Key, T: Transport, H: MessageHandler<T>> {
|
||||
buf: Buffer,
|
||||
ctrl: (channel::Sender<Next>, channel::Receiver<Next>),
|
||||
keep_alive_enabled: bool,
|
||||
@@ -33,7 +39,7 @@ pub struct Conn<K: Key, T: Transport, H: MessageHandler<T>> {
|
||||
transport: T,
|
||||
}
|
||||
|
||||
impl<K: Key, T: Transport, H: MessageHandler<T>> fmt::Debug for Conn<K, T, H> {
|
||||
impl<K: Key, T: Transport, H: MessageHandler<T>> fmt::Debug for ConnInner<K, T, H> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("Conn")
|
||||
.field("keep_alive_enabled", &self.keep_alive_enabled)
|
||||
@@ -43,23 +49,14 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> fmt::Debug for Conn<K, T, H> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
|
||||
pub fn new(key: K, transport: T, notify: rotor::Notifier) -> Conn<K, T, H> {
|
||||
Conn {
|
||||
buf: Buffer::new(),
|
||||
ctrl: channel::new(notify),
|
||||
keep_alive_enabled: true,
|
||||
key: key,
|
||||
state: State::Init,
|
||||
transport: transport,
|
||||
impl<K: Key, T: Transport, H: MessageHandler<T>> fmt::Debug for Conn<K, T, H> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn keep_alive(mut self, val: bool) -> Conn<K, T, H> {
|
||||
self.keep_alive_enabled = val;
|
||||
self
|
||||
}
|
||||
|
||||
impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
|
||||
/// Desired Register interest based on state of current connection.
|
||||
///
|
||||
/// This includes the user interest, such as when they return `Next::read()`.
|
||||
@@ -434,102 +431,6 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ready<F>(mut self, events: EventSet, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
|
||||
where F: MessageHandlerFactory<K, T, Output=H> {
|
||||
trace!("Conn::ready events='{:?}', blocked={:?}", events, self.transport.blocked());
|
||||
|
||||
if events.is_error() {
|
||||
match self.transport.take_socket_error() {
|
||||
Ok(_) => {
|
||||
trace!("is_error, but not socket error");
|
||||
// spurious?
|
||||
},
|
||||
Err(e) => self.on_error(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
// if the user had an io interest, but the transport was blocked differently,
|
||||
// the event needs to be translated to what the user was actually expecting.
|
||||
//
|
||||
// Example:
|
||||
// - User asks for `Next::write().
|
||||
// - But transport is in the middle of renegotiating TLS, and is blocked on reading.
|
||||
// - hyper should not wait on the `write` event, since epoll already
|
||||
// knows it is writable. We would just loop a whole bunch, and slow down.
|
||||
// - So instead, hyper waits on the event needed to unblock the transport, `read`.
|
||||
// - Once epoll detects the transport is readable, it will alert hyper
|
||||
// with a `readable` event.
|
||||
// - hyper needs to translate that `readable` event back into a `write`,
|
||||
// since that is actually what the Handler wants.
|
||||
|
||||
let events = if let Some(blocked) = self.transport.blocked() {
|
||||
let interest = self.interest();
|
||||
trace!("translating blocked={:?}, interest={:?}", blocked, interest);
|
||||
match (blocked, interest) {
|
||||
(Blocked::Read, Reg::Write) => EventSet::writable(),
|
||||
(Blocked::Write, Reg::Read) => EventSet::readable(),
|
||||
// otherwise, the transport was blocked on the same thing the user wanted
|
||||
_ => events
|
||||
}
|
||||
} else {
|
||||
events
|
||||
};
|
||||
|
||||
if events.is_readable() {
|
||||
self.on_readable(scope);
|
||||
}
|
||||
|
||||
if events.is_writable() {
|
||||
self.on_writable(scope);
|
||||
}
|
||||
|
||||
let events = match self.register() {
|
||||
Reg::Read => EventSet::readable(),
|
||||
Reg::Write => EventSet::writable(),
|
||||
Reg::ReadWrite => EventSet::readable() | EventSet::writable(),
|
||||
Reg::Wait => EventSet::none(),
|
||||
Reg::Remove => {
|
||||
trace!("removing transport");
|
||||
let _ = scope.deregister(&self.transport);
|
||||
self.on_remove();
|
||||
return None;
|
||||
},
|
||||
};
|
||||
|
||||
if events.is_readable() && self.can_read_more() {
|
||||
return self.ready(events, scope);
|
||||
}
|
||||
|
||||
trace!("scope.reregister({:?})", events);
|
||||
match scope.reregister(&self.transport, events, PollOpt::level()) {
|
||||
Ok(..) => {
|
||||
let timeout = self.state.timeout();
|
||||
Some((self, timeout))
|
||||
},
|
||||
Err(e) => {
|
||||
trace!("error reregistering: {:?}", e);
|
||||
self.on_error(e.into());
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wakeup<F>(mut self, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
|
||||
where F: MessageHandlerFactory<K, T, Output=H> {
|
||||
while let Ok(next) = self.ctrl.1.try_recv() {
|
||||
trace!("woke up with {:?}", next);
|
||||
self.state.update(next);
|
||||
}
|
||||
self.ready(EventSet::readable() | EventSet::writable(), scope)
|
||||
}
|
||||
|
||||
pub fn timeout<F>(mut self, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
|
||||
where F: MessageHandlerFactory<K, T, Output=H> {
|
||||
//TODO: check if this was a spurious timeout?
|
||||
self.on_error(::Error::Timeout);
|
||||
self.ready(EventSet::none(), scope)
|
||||
}
|
||||
|
||||
fn on_error(&mut self, err: ::Error) {
|
||||
debug!("on_error err = {:?}", err);
|
||||
trace!("on_error state = {:?}", self.state);
|
||||
@@ -541,14 +442,6 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
|
||||
self.state.update(next);
|
||||
}
|
||||
|
||||
fn on_remove(self) {
|
||||
debug!("on_remove");
|
||||
match self.state {
|
||||
State::Init | State::Closed => (),
|
||||
State::Http1(http1) => http1.handler.on_remove(self.transport),
|
||||
}
|
||||
}
|
||||
|
||||
fn on_readable<F>(&mut self, scope: &mut Scope<F>)
|
||||
where F: MessageHandlerFactory<K, T, Output=H> {
|
||||
trace!("on_readable -> {:?}", self.state);
|
||||
@@ -564,6 +457,134 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
|
||||
self.state = self.write(scope, state);
|
||||
trace!("on_writable <- {:?}", self.state);
|
||||
}
|
||||
|
||||
fn on_remove(self) {
|
||||
debug!("on_remove");
|
||||
match self.state {
|
||||
State::Init | State::Closed => (),
|
||||
State::Http1(http1) => http1.handler.on_remove(self.transport),
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
|
||||
pub fn new(key: K, transport: T, notify: rotor::Notifier) -> Conn<K, T, H> {
|
||||
Conn(Box::new(ConnInner {
|
||||
buf: Buffer::new(),
|
||||
ctrl: channel::new(notify),
|
||||
keep_alive_enabled: true,
|
||||
key: key,
|
||||
state: State::Init,
|
||||
transport: transport,
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn keep_alive(mut self, val: bool) -> Conn<K, T, H> {
|
||||
self.0.keep_alive_enabled = val;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn ready<F>(mut self, events: EventSet, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
|
||||
where F: MessageHandlerFactory<K, T, Output=H> {
|
||||
trace!("Conn::ready events='{:?}', blocked={:?}", events, self.0.transport.blocked());
|
||||
|
||||
if events.is_error() {
|
||||
match self.0.transport.take_socket_error() {
|
||||
Ok(_) => {
|
||||
trace!("is_error, but not socket error");
|
||||
// spurious?
|
||||
},
|
||||
Err(e) => self.0.on_error(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
// if the user had an io interest, but the transport was blocked differently,
|
||||
// the event needs to be translated to what the user was actually expecting.
|
||||
//
|
||||
// Example:
|
||||
// - User asks for `Next::write().
|
||||
// - But transport is in the middle of renegotiating TLS, and is blocked on reading.
|
||||
// - hyper should not wait on the `write` event, since epoll already
|
||||
// knows it is writable. We would just loop a whole bunch, and slow down.
|
||||
// - So instead, hyper waits on the event needed to unblock the transport, `read`.
|
||||
// - Once epoll detects the transport is readable, it will alert hyper
|
||||
// with a `readable` event.
|
||||
// - hyper needs to translate that `readable` event back into a `write`,
|
||||
// since that is actually what the Handler wants.
|
||||
|
||||
let events = if let Some(blocked) = self.0.transport.blocked() {
|
||||
let interest = self.0.interest();
|
||||
trace!("translating blocked={:?}, interest={:?}", blocked, interest);
|
||||
match (blocked, interest) {
|
||||
(Blocked::Read, Reg::Write) => EventSet::writable(),
|
||||
(Blocked::Write, Reg::Read) => EventSet::readable(),
|
||||
// otherwise, the transport was blocked on the same thing the user wanted
|
||||
_ => events
|
||||
}
|
||||
} else {
|
||||
events
|
||||
};
|
||||
|
||||
if events.is_readable() {
|
||||
self.0.on_readable(scope);
|
||||
}
|
||||
|
||||
if events.is_writable() {
|
||||
self.0.on_writable(scope);
|
||||
}
|
||||
|
||||
let events = match self.0.register() {
|
||||
Reg::Read => EventSet::readable(),
|
||||
Reg::Write => EventSet::writable(),
|
||||
Reg::ReadWrite => EventSet::readable() | EventSet::writable(),
|
||||
Reg::Wait => EventSet::none(),
|
||||
Reg::Remove => {
|
||||
trace!("removing transport");
|
||||
let _ = scope.deregister(&self.0.transport);
|
||||
self.on_remove();
|
||||
return None;
|
||||
},
|
||||
};
|
||||
|
||||
if events.is_readable() && self.0.can_read_more() {
|
||||
return self.ready(events, scope);
|
||||
}
|
||||
|
||||
trace!("scope.reregister({:?})", events);
|
||||
match scope.reregister(&self.0.transport, events, PollOpt::level()) {
|
||||
Ok(..) => {
|
||||
let timeout = self.0.state.timeout();
|
||||
Some((self, timeout))
|
||||
},
|
||||
Err(e) => {
|
||||
trace!("error reregistering: {:?}", e);
|
||||
self.0.on_error(e.into());
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wakeup<F>(mut self, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
|
||||
where F: MessageHandlerFactory<K, T, Output=H> {
|
||||
while let Ok(next) = self.0.ctrl.1.try_recv() {
|
||||
trace!("woke up with {:?}", next);
|
||||
self.0.state.update(next);
|
||||
}
|
||||
self.ready(EventSet::readable() | EventSet::writable(), scope)
|
||||
}
|
||||
|
||||
pub fn timeout<F>(mut self, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
|
||||
where F: MessageHandlerFactory<K, T, Output=H> {
|
||||
//TODO: check if this was a spurious timeout?
|
||||
self.0.on_error(::Error::Timeout);
|
||||
self.ready(EventSet::none(), scope)
|
||||
}
|
||||
|
||||
fn on_remove(self) {
|
||||
self.0.on_remove()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
enum State<H: MessageHandler<T>, T: Transport> {
|
||||
|
||||
Reference in New Issue
Block a user