diff --git a/src/http/conn.rs b/src/http/conn.rs index dc15e958..17d09b1a 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -24,7 +24,13 @@ const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; /// The connection will determine when a message begins and ends, creating /// a new message `MessageHandler` for each one, as well as determine if this /// connection can be kept alive after the message, or if it is complete. -pub struct Conn> { +pub struct Conn>(Box>); + + +/// `ConnInner` contains all of a connections state which Conn proxies for in a way +/// that allows Conn to maintain convenient move and self consuming method call +/// semantics but avoiding many costly memcpy calls. +struct ConnInner> { buf: Buffer, ctrl: (channel::Sender, channel::Receiver), keep_alive_enabled: bool, @@ -33,7 +39,7 @@ pub struct Conn> { transport: T, } -impl> fmt::Debug for Conn { +impl> fmt::Debug for ConnInner { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Conn") .field("keep_alive_enabled", &self.keep_alive_enabled) @@ -43,23 +49,14 @@ impl> fmt::Debug for Conn { } } -impl> Conn { - pub fn new(key: K, transport: T, notify: rotor::Notifier) -> Conn { - Conn { - buf: Buffer::new(), - ctrl: channel::new(notify), - keep_alive_enabled: true, - key: key, - state: State::Init, - transport: transport, - } +impl> fmt::Debug for Conn { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) } +} - pub fn keep_alive(mut self, val: bool) -> Conn { - self.keep_alive_enabled = val; - self - } +impl> ConnInner { /// Desired Register interest based on state of current connection. /// /// This includes the user interest, such as when they return `Next::read()`. @@ -434,102 +431,6 @@ impl> Conn { } } - pub fn ready(mut self, events: EventSet, scope: &mut Scope) -> Option<(Self, Option)> - where F: MessageHandlerFactory { - trace!("Conn::ready events='{:?}', blocked={:?}", events, self.transport.blocked()); - - if events.is_error() { - match self.transport.take_socket_error() { - Ok(_) => { - trace!("is_error, but not socket error"); - // spurious? - }, - Err(e) => self.on_error(e.into()) - } - } - - // if the user had an io interest, but the transport was blocked differently, - // the event needs to be translated to what the user was actually expecting. - // - // Example: - // - User asks for `Next::write(). - // - But transport is in the middle of renegotiating TLS, and is blocked on reading. - // - hyper should not wait on the `write` event, since epoll already - // knows it is writable. We would just loop a whole bunch, and slow down. - // - So instead, hyper waits on the event needed to unblock the transport, `read`. - // - Once epoll detects the transport is readable, it will alert hyper - // with a `readable` event. - // - hyper needs to translate that `readable` event back into a `write`, - // since that is actually what the Handler wants. - - let events = if let Some(blocked) = self.transport.blocked() { - let interest = self.interest(); - trace!("translating blocked={:?}, interest={:?}", blocked, interest); - match (blocked, interest) { - (Blocked::Read, Reg::Write) => EventSet::writable(), - (Blocked::Write, Reg::Read) => EventSet::readable(), - // otherwise, the transport was blocked on the same thing the user wanted - _ => events - } - } else { - events - }; - - if events.is_readable() { - self.on_readable(scope); - } - - if events.is_writable() { - self.on_writable(scope); - } - - let events = match self.register() { - Reg::Read => EventSet::readable(), - Reg::Write => EventSet::writable(), - Reg::ReadWrite => EventSet::readable() | EventSet::writable(), - Reg::Wait => EventSet::none(), - Reg::Remove => { - trace!("removing transport"); - let _ = scope.deregister(&self.transport); - self.on_remove(); - return None; - }, - }; - - if events.is_readable() && self.can_read_more() { - return self.ready(events, scope); - } - - trace!("scope.reregister({:?})", events); - match scope.reregister(&self.transport, events, PollOpt::level()) { - Ok(..) => { - let timeout = self.state.timeout(); - Some((self, timeout)) - }, - Err(e) => { - trace!("error reregistering: {:?}", e); - self.on_error(e.into()); - None - } - } - } - - pub fn wakeup(mut self, scope: &mut Scope) -> Option<(Self, Option)> - where F: MessageHandlerFactory { - while let Ok(next) = self.ctrl.1.try_recv() { - trace!("woke up with {:?}", next); - self.state.update(next); - } - self.ready(EventSet::readable() | EventSet::writable(), scope) - } - - pub fn timeout(mut self, scope: &mut Scope) -> Option<(Self, Option)> - where F: MessageHandlerFactory { - //TODO: check if this was a spurious timeout? - self.on_error(::Error::Timeout); - self.ready(EventSet::none(), scope) - } - fn on_error(&mut self, err: ::Error) { debug!("on_error err = {:?}", err); trace!("on_error state = {:?}", self.state); @@ -541,14 +442,6 @@ impl> Conn { self.state.update(next); } - fn on_remove(self) { - debug!("on_remove"); - match self.state { - State::Init | State::Closed => (), - State::Http1(http1) => http1.handler.on_remove(self.transport), - } - } - fn on_readable(&mut self, scope: &mut Scope) where F: MessageHandlerFactory { trace!("on_readable -> {:?}", self.state); @@ -564,6 +457,134 @@ impl> Conn { self.state = self.write(scope, state); trace!("on_writable <- {:?}", self.state); } + + fn on_remove(self) { + debug!("on_remove"); + match self.state { + State::Init | State::Closed => (), + State::Http1(http1) => http1.handler.on_remove(self.transport), + } + } + +} + +impl> Conn { + pub fn new(key: K, transport: T, notify: rotor::Notifier) -> Conn { + Conn(Box::new(ConnInner { + buf: Buffer::new(), + ctrl: channel::new(notify), + keep_alive_enabled: true, + key: key, + state: State::Init, + transport: transport, + })) + } + + pub fn keep_alive(mut self, val: bool) -> Conn { + self.0.keep_alive_enabled = val; + self + } + + pub fn ready(mut self, events: EventSet, scope: &mut Scope) -> Option<(Self, Option)> + where F: MessageHandlerFactory { + trace!("Conn::ready events='{:?}', blocked={:?}", events, self.0.transport.blocked()); + + if events.is_error() { + match self.0.transport.take_socket_error() { + Ok(_) => { + trace!("is_error, but not socket error"); + // spurious? + }, + Err(e) => self.0.on_error(e.into()) + } + } + + // if the user had an io interest, but the transport was blocked differently, + // the event needs to be translated to what the user was actually expecting. + // + // Example: + // - User asks for `Next::write(). + // - But transport is in the middle of renegotiating TLS, and is blocked on reading. + // - hyper should not wait on the `write` event, since epoll already + // knows it is writable. We would just loop a whole bunch, and slow down. + // - So instead, hyper waits on the event needed to unblock the transport, `read`. + // - Once epoll detects the transport is readable, it will alert hyper + // with a `readable` event. + // - hyper needs to translate that `readable` event back into a `write`, + // since that is actually what the Handler wants. + + let events = if let Some(blocked) = self.0.transport.blocked() { + let interest = self.0.interest(); + trace!("translating blocked={:?}, interest={:?}", blocked, interest); + match (blocked, interest) { + (Blocked::Read, Reg::Write) => EventSet::writable(), + (Blocked::Write, Reg::Read) => EventSet::readable(), + // otherwise, the transport was blocked on the same thing the user wanted + _ => events + } + } else { + events + }; + + if events.is_readable() { + self.0.on_readable(scope); + } + + if events.is_writable() { + self.0.on_writable(scope); + } + + let events = match self.0.register() { + Reg::Read => EventSet::readable(), + Reg::Write => EventSet::writable(), + Reg::ReadWrite => EventSet::readable() | EventSet::writable(), + Reg::Wait => EventSet::none(), + Reg::Remove => { + trace!("removing transport"); + let _ = scope.deregister(&self.0.transport); + self.on_remove(); + return None; + }, + }; + + if events.is_readable() && self.0.can_read_more() { + return self.ready(events, scope); + } + + trace!("scope.reregister({:?})", events); + match scope.reregister(&self.0.transport, events, PollOpt::level()) { + Ok(..) => { + let timeout = self.0.state.timeout(); + Some((self, timeout)) + }, + Err(e) => { + trace!("error reregistering: {:?}", e); + self.0.on_error(e.into()); + None + } + } + } + + pub fn wakeup(mut self, scope: &mut Scope) -> Option<(Self, Option)> + where F: MessageHandlerFactory { + while let Ok(next) = self.0.ctrl.1.try_recv() { + trace!("woke up with {:?}", next); + self.0.state.update(next); + } + self.ready(EventSet::readable() | EventSet::writable(), scope) + } + + pub fn timeout(mut self, scope: &mut Scope) -> Option<(Self, Option)> + where F: MessageHandlerFactory { + //TODO: check if this was a spurious timeout? + self.0.on_error(::Error::Timeout); + self.ready(EventSet::none(), scope) + } + + fn on_remove(self) { + self.0.on_remove() + } + } enum State, T: Transport> {