fix(client): fix panics when some errors occured inside HttpMessage

BREAKING CHANGE: This changes the signature of HttpWriter.end(),
  returning a `EndError` that is similar to std::io::IntoInnerError,
  allowing HttpMessage to retrieve the broken connections and not panic.

  The breaking change isn't exposed in any usage of the `Client` API,
  but for anyone using `HttpWriter` directly, since this was technically
  a public method, that change is breaking.
This commit is contained in:
Sean McArthur
2015-08-27 15:43:22 -07:00
parent 4f672fe744
commit ef15257b73
2 changed files with 112 additions and 62 deletions

View File

@@ -226,4 +226,14 @@ mod tests {
assert_eq!(read_to_string(res).unwrap(), "1".to_owned()); assert_eq!(read_to_string(res).unwrap(), "1".to_owned());
} }
#[test]
fn test_parse_error_closes() {
let url = Url::parse("http://hyper.rs").unwrap();
let stream = MockStream::with_input(b"\
definitely not http
");
assert!(Response::new(url, Box::new(stream)).is_err());
}
} }

View File

@@ -74,12 +74,15 @@ impl Read for Http11Message {
impl HttpMessage for Http11Message { impl HttpMessage for Http11Message {
fn set_outgoing(&mut self, mut head: RequestHead) -> ::Result<RequestHead> { fn set_outgoing(&mut self, mut head: RequestHead) -> ::Result<RequestHead> {
if self.stream.is_none() { let stream = match self.stream.take() {
return Err(From::from(io::Error::new( Some(stream) => stream,
io::ErrorKind::Other, None => {
"Message not idle, cannot start new outgoing"))); return Err(From::from(io::Error::new(
} io::ErrorKind::Other,
let mut stream = BufWriter::new(self.stream.take().unwrap()); "Message not idle, cannot start new outgoing")));
}
};
let mut stream = BufWriter::new(stream);
let mut uri = head.url.serialize_path().unwrap(); let mut uri = head.url.serialize_path().unwrap();
if let Some(ref q) = head.url.query { if let Some(ref q) = head.url.query {
@@ -92,72 +95,89 @@ impl HttpMessage for Http11Message {
try!(write!(&mut stream, "{} {} {}{}", try!(write!(&mut stream, "{} {} {}{}",
head.method, uri, version, LINE_ENDING)); head.method, uri, version, LINE_ENDING));
let stream = match &head.method { let stream = {
&Method::Get | &Method::Head => { let mut write_headers = |mut stream: BufWriter<Box<NetworkStream + Send>>, head: &RequestHead| {
debug!("headers={:?}", head.headers); debug!("headers={:?}", head.headers);
try!(write!(&mut stream, "{}{}", head.headers, LINE_ENDING)); match write!(&mut stream, "{}{}", head.headers, LINE_ENDING) {
EmptyWriter(stream) Ok(_) => Ok(stream),
}, Err(e) => {
_ => { self.stream = Some(stream.into_inner().unwrap());
let mut chunked = true; Err(e)
let mut len = 0;
match head.headers.get::<header::ContentLength>() {
Some(cl) => {
chunked = false;
len = **cl;
},
None => ()
};
// can't do in match above, thanks borrowck
if chunked {
let encodings = match head.headers.get_mut::<header::TransferEncoding>() {
Some(&mut header::TransferEncoding(ref mut encodings)) => {
//TODO: check if chunked is already in encodings. use HashSet?
encodings.push(header::Encoding::Chunked);
false
},
None => true
};
if encodings {
head.headers.set::<header::TransferEncoding>(
header::TransferEncoding(vec![header::Encoding::Chunked]))
} }
} }
};
match &head.method {
&Method::Get | &Method::Head => {
EmptyWriter(try!(write_headers(stream, &head)))
},
_ => {
let mut chunked = true;
let mut len = 0;
debug!("headers={:?}", head.headers); match head.headers.get::<header::ContentLength>() {
try!(write!(&mut stream, "{}{}", head.headers, LINE_ENDING)); Some(cl) => {
chunked = false;
len = **cl;
},
None => ()
};
if chunked { // can't do in match above, thanks borrowck
ChunkedWriter(stream) if chunked {
} else { let encodings = match head.headers.get_mut::<header::TransferEncoding>() {
SizedWriter(stream, len) Some(encodings) => {
//TODO: check if chunked is already in encodings. use HashSet?
encodings.push(header::Encoding::Chunked);
false
},
None => true
};
if encodings {
head.headers.set(
header::TransferEncoding(vec![header::Encoding::Chunked]))
}
}
let stream = try!(write_headers(stream, &head));
if chunked {
ChunkedWriter(stream)
} else {
SizedWriter(stream, len)
}
} }
} }
}; };
self.method = Some(head.method.clone());
self.writer = Some(stream); self.writer = Some(stream);
self.method = Some(head.method.clone());
Ok(head) Ok(head)
} }
fn get_incoming(&mut self) -> ::Result<ResponseHead> { fn get_incoming(&mut self) -> ::Result<ResponseHead> {
try!(self.flush_outgoing()); try!(self.flush_outgoing());
if self.stream.is_none() { let stream = match self.stream.take() {
// The message was already in the reading state... Some(stream) => stream,
// TODO Decide what happens in case we try to get a new incoming at that point None => {
return Err(From::from( // The message was already in the reading state...
io::Error::new(io::ErrorKind::Other, // TODO Decide what happens in case we try to get a new incoming at that point
"Read already in progress"))); return Err(From::from(
} io::Error::new(io::ErrorKind::Other,
"Read already in progress")));
}
};
let stream = self.stream.take().unwrap();
let mut stream = BufReader::new(stream); let mut stream = BufReader::new(stream);
let head = try!(parse_response(&mut stream)); let head = match parse_response(&mut stream) {
Ok(head) => head,
Err(e) => {
self.stream = Some(stream.into_inner());
return Err(e);
}
};
let raw_status = head.subject; let raw_status = head.subject;
let headers = head.headers; let headers = head.headers;
@@ -170,7 +190,7 @@ impl HttpMessage for Http11Message {
// 5. Content-Length header has a sized body. // 5. Content-Length header has a sized body.
// 6. Not Client. // 6. Not Client.
// 7. Read till EOF. // 7. Read till EOF.
let body = match (method, raw_status.0) { self.reader = Some(match (method, raw_status.0) {
(Method::Head, _) => EmptyReader(stream), (Method::Head, _) => EmptyReader(stream),
(_, 100...199) | (_, 204) | (_, 304) => EmptyReader(stream), (_, 100...199) | (_, 204) | (_, 304) => EmptyReader(stream),
(Method::Connect, 200...299) => EmptyReader(stream), (Method::Connect, 200...299) => EmptyReader(stream),
@@ -192,9 +212,8 @@ impl HttpMessage for Http11Message {
EofReader(stream) EofReader(stream)
} }
} }
}; });
self.reader = Some(body);
Ok(ResponseHead { Ok(ResponseHead {
headers: headers, headers: headers,
@@ -292,9 +311,15 @@ impl Http11Message {
}; };
let writer = self.writer.take().unwrap(); let writer = self.writer.take().unwrap();
let raw = try!(writer.end()).into_inner().unwrap(); // end() already flushes // end() already flushes
let raw = match writer.end() {
Ok(buf) => buf.into_inner().unwrap(),
Err(e) => {
self.writer = Some(e.1);
return Err(From::from(e.0));
}
};
self.stream = Some(raw); self.stream = Some(raw);
Ok(()) Ok(())
} }
} }
@@ -617,10 +642,25 @@ impl<W: Write> HttpWriter<W> {
/// A final `write_all()` is called with an empty message, and then flushed. /// A final `write_all()` is called with an empty message, and then flushed.
/// The ChunkedWriter variant will use this to write the 0-sized last-chunk. /// The ChunkedWriter variant will use this to write the 0-sized last-chunk.
#[inline] #[inline]
pub fn end(mut self) -> io::Result<W> { pub fn end(mut self) -> Result<W, EndError<W>> {
try!(self.write(&[])); fn inner<W: Write>(w: &mut W) -> io::Result<()> {
try!(self.flush()); try!(w.write(&[]));
Ok(self.into_inner()) w.flush()
}
match inner(&mut self) {
Ok(..) => Ok(self.into_inner()),
Err(e) => Err(EndError(e, self))
}
}
}
#[derive(Debug)]
pub struct EndError<W: Write>(io::Error, HttpWriter<W>);
impl<W: Write> From<EndError<W>> for io::Error {
fn from(e: EndError<W>) -> io::Error {
e.0
} }
} }