Merge pull request #856 from hyperium/keep-alive
feat(client): implement connection pooling for Client
This commit is contained in:
		| @@ -5,6 +5,7 @@ | ||||
|  | ||||
| use std::collections::HashMap; | ||||
| use std::fmt; | ||||
| use std::io; | ||||
| use std::marker::PhantomData; | ||||
| use std::sync::mpsc; | ||||
| use std::thread; | ||||
| @@ -24,7 +25,6 @@ pub use self::response::Response; | ||||
|  | ||||
| mod connect; | ||||
| mod dns; | ||||
| //mod pool; | ||||
| mod request; | ||||
| mod response; | ||||
|  | ||||
| @@ -116,6 +116,7 @@ impl<H: Send> Client<H> { | ||||
|             loop_.run(Context { | ||||
|                 connect_timeout: connect_timeout, | ||||
|                 keep_alive: keep_alive, | ||||
|                 idle_conns: HashMap::new(), | ||||
|                 queue: HashMap::new(), | ||||
|             }).unwrap() | ||||
|         })); | ||||
| @@ -332,7 +333,7 @@ impl<H: Handler<T>, T: Transport> http::MessageHandler<T> for Message<H, T> { | ||||
| struct Context<K, H> { | ||||
|     connect_timeout: Duration, | ||||
|     keep_alive: bool, | ||||
|     // idle: HashMap<K, Vec<Notify>>, | ||||
|     idle_conns: HashMap<K, Vec<http::Control>>, | ||||
|     queue: HashMap<K, Vec<Queued<H>>>, | ||||
| } | ||||
|  | ||||
| @@ -352,6 +353,27 @@ impl<K: http::Key, H> Context<K, H> { | ||||
|         } | ||||
|         queued | ||||
|     } | ||||
|  | ||||
|     fn conn_response<C>(&mut self, conn: Option<(http::Conn<K, C::Output, Message<H, C::Output>>, Option<Duration>)>, time: rotor::Time) | ||||
|     -> rotor::Response<ClientFsm<C, H>, (C::Key, C::Output)> | ||||
|     where C: Connect<Key=K>, H: Handler<C::Output> { | ||||
|         match conn { | ||||
|             Some((conn, timeout)) => { | ||||
|                 //TODO: HTTP2: a connection doesn't need to be idle to be used for a second stream | ||||
|                 if conn.is_idle() { | ||||
|                     self.idle_conns.entry(conn.key().clone()).or_insert_with(Vec::new) | ||||
|                         .push(conn.control()); | ||||
|                 } | ||||
|                 match timeout { | ||||
|                     Some(dur) => rotor::Response::ok(ClientFsm::Socket(conn)) | ||||
|                         .deadline(time + dur), | ||||
|                     None => rotor::Response::ok(ClientFsm::Socket(conn)), | ||||
|                 } | ||||
|  | ||||
|             } | ||||
|             None => rotor::Response::done() | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<K: http::Key, H: Handler<T>, T: Transport> http::MessageHandlerFactory<K, T> for Context<K, H> { | ||||
| @@ -414,14 +436,9 @@ where C: Connect, | ||||
|                 unreachable!("Connector can never be ready") | ||||
|             }, | ||||
|             ClientFsm::Socket(conn) => { | ||||
|                 match conn.ready(events, scope) { | ||||
|                     Some((conn, None)) => rotor::Response::ok(ClientFsm::Socket(conn)), | ||||
|                     Some((conn, Some(dur))) => { | ||||
|                         rotor::Response::ok(ClientFsm::Socket(conn)) | ||||
|                             .deadline(scope.now() + dur) | ||||
|                     } | ||||
|                     None => rotor::Response::done() | ||||
|                 } | ||||
|                 let res = conn.ready(events, scope); | ||||
|                 let now = scope.now(); | ||||
|                 scope.conn_response(res, now) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| @@ -461,14 +478,9 @@ where C: Connect, | ||||
|                 } | ||||
|             } | ||||
|             ClientFsm::Socket(conn) => { | ||||
|                 match conn.timeout(scope) { | ||||
|                     Some((conn, None)) => rotor::Response::ok(ClientFsm::Socket(conn)), | ||||
|                     Some((conn, Some(dur))) => { | ||||
|                         rotor::Response::ok(ClientFsm::Socket(conn)) | ||||
|                             .deadline(scope.now() + dur) | ||||
|                     } | ||||
|                     None => rotor::Response::done() | ||||
|                 } | ||||
|                 let res = conn.timeout(scope); | ||||
|                 let now = scope.now(); | ||||
|                 scope.conn_response(res, now) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| @@ -478,13 +490,10 @@ where C: Connect, | ||||
|             ClientFsm::Connector(..) => { | ||||
|                 self.connect(scope) | ||||
|             }, | ||||
|             ClientFsm::Socket(conn) => match conn.wakeup(scope) { | ||||
|                 Some((conn, None)) => rotor::Response::ok(ClientFsm::Socket(conn)), | ||||
|                 Some((conn, Some(dur))) => { | ||||
|                     rotor::Response::ok(ClientFsm::Socket(conn)) | ||||
|                         .deadline(scope.now() + dur) | ||||
|                 } | ||||
|                 None => rotor::Response::done() | ||||
|             ClientFsm::Socket(conn) => { | ||||
|                 let res = conn.wakeup(scope); | ||||
|                 let now = scope.now(); | ||||
|                 scope.conn_response(res, now) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| @@ -513,7 +522,41 @@ where C: Connect, | ||||
|                 loop { | ||||
|                     match rx.try_recv() { | ||||
|                         Ok(Notify::Connect(url, mut handler)) => { | ||||
|                             // TODO: check pool for sockets to this domain | ||||
|                             // check pool for sockets to this domain | ||||
|                             if let Some(key) = connector.key(&url) { | ||||
|                                 let mut remove_idle = false; | ||||
|                                 let mut woke_up = false; | ||||
|                                 if let Some(mut idle) = scope.idle_conns.get_mut(&key) { | ||||
|                                     while !idle.is_empty() { | ||||
|                                         let ctrl = idle.remove(0); | ||||
|                                         // err means the socket has since died | ||||
|                                         if ctrl.ready(Next::write()).is_ok() { | ||||
|                                             woke_up = true; | ||||
|                                             break; | ||||
|                                         } | ||||
|                                     } | ||||
|                                     remove_idle = idle.is_empty(); | ||||
|                                 } | ||||
|                                 if remove_idle { | ||||
|                                     scope.idle_conns.remove(&key); | ||||
|                                 } | ||||
|  | ||||
|                                 if woke_up { | ||||
|                                     trace!("woke up idle conn for '{}'", url); | ||||
|                                     let deadline = scope.now() + scope.connect_timeout; | ||||
|                                     scope.queue.entry(key).or_insert_with(Vec::new).push(Queued { | ||||
|                                         deadline: deadline, | ||||
|                                         handler: handler, | ||||
|                                         url: url | ||||
|                                     }); | ||||
|                                     continue; | ||||
|                                 } | ||||
|                             } else { | ||||
|                                 // this connector cannot handle this url anyways | ||||
|                                 let _ = handler.on_error(io::Error::new(io::ErrorKind::InvalidInput, "invalid url for connector").into()); | ||||
|                                 continue; | ||||
|                             } | ||||
|                             // no exist connection, call connector | ||||
|                             match connector.connect(&url) { | ||||
|                                 Ok(key) => { | ||||
|                                     let deadline = scope.now() + scope.connect_timeout; | ||||
|   | ||||
| @@ -63,8 +63,8 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|     fn interest(&self) -> Reg { | ||||
|         match self.state { | ||||
|             State::Closed => Reg::Remove, | ||||
|             State::Init => { | ||||
|                 <H as MessageHandler>::Message::initial_interest().interest() | ||||
|             State::Init { interest, .. } => { | ||||
|                 interest.register() | ||||
|             } | ||||
|             State::Http1(Http1 { reading: Reading::Closed, writing: Writing::Closed, .. }) => { | ||||
|                 Reg::Remove | ||||
| @@ -142,12 +142,12 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|  | ||||
|     fn read<F: MessageHandlerFactory<K, T, Output=H>>(&mut self, scope: &mut Scope<F>, state: State<H, T>) -> State<H, T> { | ||||
|          match state { | ||||
|             State::Init => { | ||||
|             State::Init { interest: Next_::Read, .. } => { | ||||
|                 let head = match self.parse() { | ||||
|                     Ok(head) => head, | ||||
|                     Err(::Error::Io(e)) => match e.kind() { | ||||
|                         io::ErrorKind::WouldBlock | | ||||
|                         io::ErrorKind::Interrupted => return State::Init, | ||||
|                         io::ErrorKind::Interrupted => return state, | ||||
|                         _ => { | ||||
|                             debug!("io error trying to parse {:?}", e); | ||||
|                             return State::Closed; | ||||
| @@ -219,6 +219,10 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|                     } | ||||
|                 } | ||||
|             }, | ||||
|             State::Init { .. } => { | ||||
|                 trace!("on_readable State::{:?}", state); | ||||
|                 state | ||||
|             }, | ||||
|             State::Http1(mut http1) => { | ||||
|                 let next = match http1.reading { | ||||
|                     Reading::Init => None, | ||||
| @@ -274,7 +278,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|                 if let Some(next) = next { | ||||
|                     s.update(next); | ||||
|                 } | ||||
|                 trace!("Conn.on_readable State::Http1 completed, new state = {:?}", s); | ||||
|                 trace!("Conn.on_readable State::Http1 completed, new state = State::{:?}", s); | ||||
|  | ||||
|                 let again = match s { | ||||
|                     State::Http1(Http1 { reading: Reading::Body(ref encoder), .. }) => encoder.is_eof(), | ||||
| @@ -296,7 +300,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|  | ||||
|     fn write<F: MessageHandlerFactory<K, T, Output=H>>(&mut self, scope: &mut Scope<F>, mut state: State<H, T>) -> State<H, T> { | ||||
|         let next = match state { | ||||
|             State::Init => { | ||||
|             State::Init { interest: Next_::Write, .. } => { | ||||
|                 // this could be a Client request, which writes first, so pay | ||||
|                 // attention to the version written here, which will adjust | ||||
|                 // our internal state to Http1 or Http2 | ||||
| @@ -336,6 +340,10 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|                 } | ||||
|                 Some(interest) | ||||
|             } | ||||
|             State::Init { .. } => { | ||||
|                 trace!("Conn.on_writable State::{:?}", state); | ||||
|                 None | ||||
|             } | ||||
|             State::Http1(Http1 { ref mut handler, ref mut writing, ref mut keep_alive, .. }) => { | ||||
|                 match *writing { | ||||
|                     Writing::Init => { | ||||
| @@ -426,7 +434,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|  | ||||
|     fn can_read_more(&self) -> bool { | ||||
|         match self.state { | ||||
|             State::Init => false, | ||||
|             State::Init { .. } => false, | ||||
|             _ => !self.buf.is_empty() | ||||
|         } | ||||
|     } | ||||
| @@ -435,7 +443,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|         debug!("on_error err = {:?}", err); | ||||
|         trace!("on_error state = {:?}", self.state); | ||||
|         let next = match self.state { | ||||
|             State::Init => Next::remove(), | ||||
|             State::Init { .. } => Next::remove(), | ||||
|             State::Http1(ref mut http1) => http1.handler.on_error(err), | ||||
|             State::Closed => Next::remove(), | ||||
|         }; | ||||
| @@ -461,7 +469,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|     fn on_remove(self) { | ||||
|         debug!("on_remove"); | ||||
|         match self.state { | ||||
|             State::Init | State::Closed => (), | ||||
|             State::Init { .. } | State::Closed => (), | ||||
|             State::Http1(http1) => http1.handler.on_remove(self.transport), | ||||
|         } | ||||
|     } | ||||
| @@ -475,7 +483,10 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> { | ||||
|             ctrl: channel::new(notify), | ||||
|             keep_alive_enabled: true, | ||||
|             key: key, | ||||
|             state: State::Init, | ||||
|             state: State::Init { | ||||
|                 interest: H::Message::initial_interest().interest, | ||||
|                 timeout: None, | ||||
|             }, | ||||
|             transport: transport, | ||||
|         })) | ||||
|     } | ||||
| @@ -585,10 +596,30 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> { | ||||
|         self.0.on_remove() | ||||
|     } | ||||
|  | ||||
|     pub fn key(&self) -> &K { | ||||
|         &self.0.key | ||||
|     } | ||||
|  | ||||
|     pub fn control(&self) -> Control { | ||||
|         Control { | ||||
|             tx: self.0.ctrl.0.clone(), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn is_idle(&self) -> bool { | ||||
|         if let State::Init { interest: Next_::Wait, .. } = self.0.state { | ||||
|             true | ||||
|         } else { | ||||
|             false | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| enum State<H: MessageHandler<T>, T: Transport> { | ||||
|     Init, | ||||
|     Init { | ||||
|         interest: Next_, | ||||
|         timeout: Option<Duration>, | ||||
|     }, | ||||
|     /// Http1 will only ever use a connection to send and receive a single | ||||
|     /// message at a time. Once a H1 status has been determined, we will either | ||||
|     /// be reading or writing an H1 message, and optionally multiple if | ||||
| @@ -606,7 +637,7 @@ enum State<H: MessageHandler<T>, T: Transport> { | ||||
| impl<H: MessageHandler<T>, T: Transport> State<H, T> { | ||||
|     fn timeout(&self) -> Option<Duration> { | ||||
|         match *self { | ||||
|             State::Init => None, | ||||
|             State::Init { timeout, .. } => timeout, | ||||
|             State::Http1(ref http1) => http1.timeout, | ||||
|             State::Closed => None, | ||||
|         } | ||||
| @@ -616,7 +647,10 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> { | ||||
| impl<H: MessageHandler<T>, T: Transport> fmt::Debug for State<H, T> { | ||||
|     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||
|         match *self { | ||||
|             State::Init => f.write_str("Init"), | ||||
|             State::Init { interest, timeout } => f.debug_struct("Init") | ||||
|                 .field("interest", &interest) | ||||
|                 .field("timeout", &timeout) | ||||
|                 .finish(), | ||||
|             State::Http1(ref h1) => f.debug_tuple("Http1") | ||||
|                 .field(h1) | ||||
|                 .finish(), | ||||
| @@ -632,10 +666,14 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> { | ||||
|         let new_state = match (state, next.interest) { | ||||
|             (_, Next_::Remove) => State::Closed, | ||||
|             (State::Closed, _) => State::Closed, | ||||
|             (State::Init, _) => State::Init, | ||||
|             (State::Init { timeout, .. }, e) => State::Init { | ||||
|                 interest: e, | ||||
|                 timeout: timeout, | ||||
|             }, | ||||
|             (State::Http1(http1), Next_::End) => { | ||||
|                 let reading = match http1.reading { | ||||
|                     Reading::Body(ref decoder) if decoder.is_eof() => { | ||||
|                     Reading::Body(ref decoder) | | ||||
|                     Reading::Wait(ref decoder) if decoder.is_eof() => { | ||||
|                         if http1.keep_alive { | ||||
|                             Reading::KeepAlive | ||||
|                         } else { | ||||
| @@ -646,6 +684,7 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> { | ||||
|                     _ => Reading::Closed, | ||||
|                 }; | ||||
|                 let writing = match http1.writing { | ||||
|                     Writing::Wait(encoder) | | ||||
|                     Writing::Ready(encoder) => { | ||||
|                         if encoder.is_eof() { | ||||
|                             if http1.keep_alive { | ||||
| @@ -691,8 +730,11 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> { | ||||
|                 }; | ||||
|                 match (reading, writing) { | ||||
|                     (Reading::KeepAlive, Writing::KeepAlive) => { | ||||
|                         //http1.handler.on_keep_alive(); | ||||
|                         State::Init | ||||
|                         //XXX keepalive | ||||
|                         State::Init { | ||||
|                         interest: H::Message::keep_alive_interest().interest, | ||||
|                             timeout: None, | ||||
|                         } | ||||
|                     }, | ||||
|                     (reading, Writing::Chunk(chunk)) => { | ||||
|                         State::Http1(Http1 { | ||||
|   | ||||
| @@ -31,6 +31,10 @@ impl Http1Message for ServerMessage { | ||||
|         Next::new(Next_::Read) | ||||
|     } | ||||
|  | ||||
|     fn keep_alive_interest() -> Next { | ||||
|         Next::new(Next_::Read) | ||||
|     } | ||||
|  | ||||
|     fn parse(buf: &[u8]) -> ParseResult<RequestLine> { | ||||
|         let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS]; | ||||
|         trace!("Request.parse([Header; {}], [u8; {}])", headers.len(), buf.len()); | ||||
| @@ -114,6 +118,10 @@ impl Http1Message for ClientMessage { | ||||
|         Next::new(Next_::Write) | ||||
|     } | ||||
|  | ||||
|     fn keep_alive_interest() -> Next { | ||||
|         Next::new(Next_::Wait) | ||||
|     } | ||||
|  | ||||
|     fn parse(buf: &[u8]) -> ParseResult<RawStatus> { | ||||
|         let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS]; | ||||
|         trace!("Response.parse([Header; {}], [u8; {}])", headers.len(), buf.len()); | ||||
|   | ||||
| @@ -249,14 +249,16 @@ impl Deserialize for RawStatus { | ||||
| /// Checks if a connection should be kept alive. | ||||
| #[inline] | ||||
| pub fn should_keep_alive(version: HttpVersion, headers: &Headers) -> bool { | ||||
|     trace!("should_keep_alive( {:?}, {:?} )", version, headers.get::<Connection>()); | ||||
|     match (version, headers.get::<Connection>()) { | ||||
|     let ret = match (version, headers.get::<Connection>()) { | ||||
|         (Http10, None) => false, | ||||
|         (Http10, Some(conn)) if !conn.contains(&KeepAlive) => false, | ||||
|         (Http11, Some(conn)) if conn.contains(&Close)  => false, | ||||
|         _ => true | ||||
|     } | ||||
|     }; | ||||
|     trace!("should_keep_alive(version={:?}, header={:?}) = {:?}", version, headers.get::<Connection>(), ret); | ||||
|     ret | ||||
| } | ||||
|  | ||||
| pub type ParseResult<T> = ::Result<Option<(MessageHead<T>, usize)>>; | ||||
|  | ||||
| pub fn parse<T: Http1Message<Incoming=I>, I>(rdr: &[u8]) -> ParseResult<I> { | ||||
| @@ -280,6 +282,7 @@ pub trait Http1Message { | ||||
|     type Outgoing: Default; | ||||
|     //TODO: replace with associated const when stable | ||||
|     fn initial_interest() -> Next; | ||||
|     fn keep_alive_interest() -> Next; | ||||
|     fn parse(bytes: &[u8]) -> ParseResult<Self::Incoming>; | ||||
|     fn decoder(head: &MessageHead<Self::Incoming>) -> ::Result<h1::Decoder>; | ||||
|     fn encode(head: MessageHead<Self::Outgoing>, dst: &mut Vec<u8>) -> h1::Encoder; | ||||
| @@ -304,6 +307,7 @@ impl fmt::Debug for Next { | ||||
|     } | ||||
| } | ||||
|  | ||||
| // Internal enum for `Next` | ||||
| #[derive(Debug, Clone, Copy)] | ||||
| enum Next_ { | ||||
|     Read, | ||||
| @@ -314,6 +318,8 @@ enum Next_ { | ||||
|     Remove, | ||||
| } | ||||
|  | ||||
| // An enum representing all the possible actions to taken when registering | ||||
| // with the event loop. | ||||
| #[derive(Debug, Clone, Copy)] | ||||
| enum Reg { | ||||
|     Read, | ||||
| @@ -361,16 +367,11 @@ impl Next { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn interest(&self) -> Reg { | ||||
|         match self.interest { | ||||
|             Next_::Read => Reg::Read, | ||||
|             Next_::Write => Reg::Write, | ||||
|             Next_::ReadWrite => Reg::ReadWrite, | ||||
|             Next_::Wait => Reg::Wait, | ||||
|             Next_::End => Reg::Remove, | ||||
|             Next_::Remove => Reg::Remove, | ||||
|         } | ||||
|     /* | ||||
|     fn reg(&self) -> Reg { | ||||
|         self.interest.register() | ||||
|     } | ||||
|     */ | ||||
|  | ||||
|     /// Signals the desire to read from the transport. | ||||
|     pub fn read() -> Next { | ||||
| @@ -410,6 +411,19 @@ impl Next { | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Next_ { | ||||
|     fn register(&self) -> Reg { | ||||
|         match *self { | ||||
|             Next_::Read => Reg::Read, | ||||
|             Next_::Write => Reg::Write, | ||||
|             Next_::ReadWrite => Reg::ReadWrite, | ||||
|             Next_::Wait => Reg::Wait, | ||||
|             Next_::End => Reg::Remove, | ||||
|             Next_::Remove => Reg::Remove, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[test] | ||||
| fn test_should_keep_alive() { | ||||
|     let mut headers = Headers::new(); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user