fix(server): correctly handle CONNECT requests
- In the higher-level `Server` API, since connection upgrades aren't yet supported, returning a 2xx response to a `CONNECT` request is a user error. A 500 response is written to the client, the connection is closed, and an error is reported back to the user. - In the lower-level `server::Connection` API, where upgrades *are* supported, a 2xx response correctly marks the response as the final one, instead of trying to parse more requests afterwards.
This commit is contained in:
		| @@ -191,7 +191,9 @@ where | |||||||
|         // This is because Service only allows returning a single Response, and |         // This is because Service only allows returning a single Response, and | ||||||
|         // so if you try to reply with a e.g. 100 Continue, you have no way of |         // so if you try to reply with a e.g. 100 Continue, you have no way of | ||||||
|         // replying with the latter status code response. |         // replying with the latter status code response. | ||||||
|         let (ret, mut is_last) = if StatusCode::SWITCHING_PROTOCOLS == msg.head.subject { |         let is_upgrade = msg.head.subject == StatusCode::SWITCHING_PROTOCOLS | ||||||
|  |             || (msg.req_method == &Some(Method::CONNECT) && msg.head.subject.is_success()); | ||||||
|  |         let (ret, mut is_last) = if is_upgrade { | ||||||
|             (T::on_encode_upgrade(&mut msg), true) |             (T::on_encode_upgrade(&mut msg), true) | ||||||
|         } else if msg.head.subject.is_informational() { |         } else if msg.head.subject.is_informational() { | ||||||
|             error!("response with 1xx status code not supported"); |             error!("response with 1xx status code not supported"); | ||||||
| @@ -851,13 +853,21 @@ impl OnUpgrade for YesUpgrades { | |||||||
|  |  | ||||||
| impl OnUpgrade for NoUpgrades { | impl OnUpgrade for NoUpgrades { | ||||||
|     fn on_encode_upgrade(msg: &mut Encode<StatusCode>) -> ::Result<()> { |     fn on_encode_upgrade(msg: &mut Encode<StatusCode>) -> ::Result<()> { | ||||||
|         error!("response with 101 status code not supported"); |  | ||||||
|         *msg.head = MessageHead::default(); |         *msg.head = MessageHead::default(); | ||||||
|         msg.head.subject = ::StatusCode::INTERNAL_SERVER_ERROR; |         msg.head.subject = ::StatusCode::INTERNAL_SERVER_ERROR; | ||||||
|         msg.body = None; |         msg.body = None; | ||||||
|         //TODO: replace with more descriptive error |  | ||||||
|  |         if msg.head.subject == StatusCode::SWITCHING_PROTOCOLS { | ||||||
|  |             error!("response with 101 status code not supported"); | ||||||
|  |             Err(Parse::UpgradeNotSupported.into()) | ||||||
|  |         } else if msg.req_method == &Some(Method::CONNECT) { | ||||||
|  |             error!("200 response to CONNECT request not supported"); | ||||||
|  |             Err(::Error::new_user_unsupported_request_method()) | ||||||
|  |         } else { | ||||||
|  |             debug_assert!(false, "upgrade incorrectly detected"); | ||||||
|             Err(::Error::new_status()) |             Err(::Error::new_status()) | ||||||
|         } |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     fn on_decode_upgrade() -> Result<Decoder, Parse> { |     fn on_decode_upgrade() -> Result<Decoder, Parse> { | ||||||
|         debug!("received 101 upgrade response, not supported"); |         debug!("received 101 upgrade response, not supported"); | ||||||
| @@ -1309,6 +1319,39 @@ mod tests { | |||||||
|         assert_eq!(vec, b"GET / HTTP/1.1\r\nContent-Length: 10\r\nContent-Type: application/json\r\n\r\n".to_vec()); |         assert_eq!(vec, b"GET / HTTP/1.1\r\nContent-Length: 10\r\nContent-Type: application/json\r\n\r\n".to_vec()); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_server_no_upgrades_connect_method() { | ||||||
|  |         let mut head = MessageHead::default(); | ||||||
|  |  | ||||||
|  |         let mut vec = Vec::new(); | ||||||
|  |         let err = Server::encode(Encode { | ||||||
|  |             head: &mut head, | ||||||
|  |             body: None, | ||||||
|  |             keep_alive: true, | ||||||
|  |             req_method: &mut Some(Method::CONNECT), | ||||||
|  |             title_case_headers: false, | ||||||
|  |         }, &mut vec).unwrap_err(); | ||||||
|  |  | ||||||
|  |         assert!(err.is_user()); | ||||||
|  |         assert_eq!(err.kind(), &::error::Kind::UnsupportedRequestMethod); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_server_yes_upgrades_connect_method() { | ||||||
|  |         let mut head = MessageHead::default(); | ||||||
|  |  | ||||||
|  |         let mut vec = Vec::new(); | ||||||
|  |         let encoder = S::<YesUpgrades>::encode(Encode { | ||||||
|  |             head: &mut head, | ||||||
|  |             body: None, | ||||||
|  |             keep_alive: true, | ||||||
|  |             req_method: &mut Some(Method::CONNECT), | ||||||
|  |             title_case_headers: false, | ||||||
|  |         }, &mut vec).unwrap(); | ||||||
|  |  | ||||||
|  |         assert!(encoder.is_last()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     #[cfg(feature = "nightly")] |     #[cfg(feature = "nightly")] | ||||||
|     use test::Bencher; |     use test::Bencher; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1147,6 +1147,70 @@ fn upgrades() { | |||||||
|     assert_eq!(vec, b"bar=foo"); |     assert_eq!(vec, b"bar=foo"); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn http_connect() { | ||||||
|  |     use tokio_io::io::{read_to_end, write_all}; | ||||||
|  |     let _ = pretty_env_logger::try_init(); | ||||||
|  |     let runtime = Runtime::new().unwrap(); | ||||||
|  |     let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); | ||||||
|  |     let addr = listener.local_addr().unwrap(); | ||||||
|  |     let (tx, rx) = oneshot::channel(); | ||||||
|  |  | ||||||
|  |     thread::spawn(move || { | ||||||
|  |         let mut tcp = connect(&addr); | ||||||
|  |         tcp.write_all(b"\ | ||||||
|  |             CONNECT localhost:80 HTTP/1.1\r\n\ | ||||||
|  |             \r\n\ | ||||||
|  |             eagerly optimistic\ | ||||||
|  |         ").expect("write 1"); | ||||||
|  |         let mut buf = [0; 256]; | ||||||
|  |         tcp.read(&mut buf).expect("read 1"); | ||||||
|  |  | ||||||
|  |         let expected = "HTTP/1.1 200 OK\r\n"; | ||||||
|  |         assert_eq!(s(&buf[..expected.len()]), expected); | ||||||
|  |         let _ = tx.send(()); | ||||||
|  |  | ||||||
|  |         let n = tcp.read(&mut buf).expect("read 2"); | ||||||
|  |         assert_eq!(s(&buf[..n]), "foo=bar"); | ||||||
|  |         tcp.write_all(b"bar=foo").expect("write 2"); | ||||||
|  |     }); | ||||||
|  |  | ||||||
|  |     let fut = listener.incoming() | ||||||
|  |         .into_future() | ||||||
|  |         .map_err(|_| -> hyper::Error { unreachable!() }) | ||||||
|  |         .and_then(|(item, _incoming)| { | ||||||
|  |             let socket = item.unwrap(); | ||||||
|  |             let conn = Http::new() | ||||||
|  |                 .serve_connection(socket, service_fn(|_| { | ||||||
|  |                     let res = Response::builder() | ||||||
|  |                         .status(200) | ||||||
|  |                         .body(hyper::Body::empty()) | ||||||
|  |                         .unwrap(); | ||||||
|  |                     Ok::<_, hyper::Error>(res) | ||||||
|  |                 })); | ||||||
|  |  | ||||||
|  |             let mut conn_opt = Some(conn); | ||||||
|  |             future::poll_fn(move || { | ||||||
|  |                 try_ready!(conn_opt.as_mut().unwrap().poll_without_shutdown()); | ||||||
|  |                 // conn is done with HTTP now | ||||||
|  |                 Ok(conn_opt.take().unwrap().into()) | ||||||
|  |             }) | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |     let conn = fut.wait().unwrap(); | ||||||
|  |  | ||||||
|  |     // wait so that we don't write until other side saw 101 response | ||||||
|  |     rx.wait().unwrap(); | ||||||
|  |  | ||||||
|  |     let parts = conn.into_parts(); | ||||||
|  |     let io = parts.io; | ||||||
|  |     assert_eq!(parts.read_buf, "eagerly optimistic"); | ||||||
|  |  | ||||||
|  |     let io = write_all(io, b"foo=bar").wait().unwrap().0; | ||||||
|  |     let vec = read_to_end(io, vec![]).wait().unwrap().1; | ||||||
|  |     assert_eq!(vec, b"bar=foo"); | ||||||
|  | } | ||||||
|  |  | ||||||
| #[test] | #[test] | ||||||
| fn parse_errors_send_4xx_response() { | fn parse_errors_send_4xx_response() { | ||||||
|     let runtime = Runtime::new().unwrap(); |     let runtime = Runtime::new().unwrap(); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user