Merge pull request #648 from mlalic/h1-msg-keep-calm

[WIP] Add a stream state enum that makes it impossible to lose a stream
This commit is contained in:
Sean McArthur
2015-09-09 10:50:37 -07:00

View File

@@ -35,19 +35,70 @@ use version;
const MAX_INVALID_RESPONSE_BYTES: usize = 1024 * 128; const MAX_INVALID_RESPONSE_BYTES: usize = 1024 * 128;
#[derive(Debug)]
struct Wrapper<T> {
obj: Option<T>,
}
impl<T> Wrapper<T> {
pub fn new(obj: T) -> Wrapper<T> {
Wrapper { obj: Some(obj) }
}
pub fn map_in_place<F>(&mut self, f: F) where F: FnOnce(T) -> T {
let obj = self.obj.take().unwrap();
let res = f(obj);
self.obj = Some(res);
}
pub fn into_inner(self) -> T { self.obj.unwrap() }
pub fn as_mut(&mut self) -> &mut T { self.obj.as_mut().unwrap() }
pub fn as_ref(&self) -> &T { self.obj.as_ref().unwrap() }
}
#[derive(Debug)]
enum Stream {
Idle(Box<NetworkStream + Send>),
Writing(HttpWriter<BufWriter<Box<NetworkStream + Send>>>),
Reading(HttpReader<BufReader<Box<NetworkStream + Send>>>),
}
impl Stream {
fn writer_mut(&mut self) -> Option<&mut HttpWriter<BufWriter<Box<NetworkStream + Send>>>> {
match *self {
Stream::Writing(ref mut writer) => Some(writer),
_ => None,
}
}
fn reader_mut(&mut self) -> Option<&mut HttpReader<BufReader<Box<NetworkStream + Send>>>> {
match *self {
Stream::Reading(ref mut reader) => Some(reader),
_ => None,
}
}
fn reader_ref(&self) -> Option<&HttpReader<BufReader<Box<NetworkStream + Send>>>> {
match *self {
Stream::Reading(ref reader) => Some(reader),
_ => None,
}
}
fn new(stream: Box<NetworkStream + Send>) -> Stream {
Stream::Idle(stream)
}
}
/// An implementation of the `HttpMessage` trait for HTTP/1.1. /// An implementation of the `HttpMessage` trait for HTTP/1.1.
#[derive(Debug)] #[derive(Debug)]
pub struct Http11Message { pub struct Http11Message {
method: Option<Method>, method: Option<Method>,
stream: Option<Box<NetworkStream + Send>>, stream: Wrapper<Stream>,
writer: Option<HttpWriter<BufWriter<Box<NetworkStream + Send>>>>,
reader: Option<HttpReader<BufReader<Box<NetworkStream + Send>>>>,
} }
impl Write for Http11Message { impl Write for Http11Message {
#[inline] #[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.writer { match self.stream.as_mut().writer_mut() {
None => Err(io::Error::new(io::ErrorKind::Other, None => Err(io::Error::new(io::ErrorKind::Other,
"Not in a writable state")), "Not in a writable state")),
Some(ref mut writer) => writer.write(buf), Some(ref mut writer) => writer.write(buf),
@@ -55,7 +106,7 @@ impl Write for Http11Message {
} }
#[inline] #[inline]
fn flush(&mut self) -> io::Result<()> { fn flush(&mut self) -> io::Result<()> {
match self.writer { match self.stream.as_mut().writer_mut() {
None => Err(io::Error::new(io::ErrorKind::Other, None => Err(io::Error::new(io::ErrorKind::Other,
"Not in a writable state")), "Not in a writable state")),
Some(ref mut writer) => writer.flush(), Some(ref mut writer) => writer.flush(),
@@ -66,7 +117,7 @@ impl Write for Http11Message {
impl Read for Http11Message { impl Read for Http11Message {
#[inline] #[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.reader { match self.stream.as_mut().reader_mut() {
None => Err(io::Error::new(io::ErrorKind::Other, None => Err(io::Error::new(io::ErrorKind::Other,
"Not in a readable state")), "Not in a readable state")),
Some(ref mut reader) => reader.read(buf), Some(ref mut reader) => reader.read(buf),
@@ -76,13 +127,19 @@ impl Read for Http11Message {
impl HttpMessage for Http11Message { impl HttpMessage for Http11Message {
fn set_outgoing(&mut self, mut head: RequestHead) -> ::Result<RequestHead> { fn set_outgoing(&mut self, mut head: RequestHead) -> ::Result<RequestHead> {
let stream = match self.stream.take() { let mut res = Err(Error::from(io::Error::new(
Some(stream) => stream, io::ErrorKind::Other,
None => { "")));
return Err(From::from(io::Error::new( let mut method = None;
self.stream.map_in_place(|stream: Stream| -> Stream {
let stream = match stream {
Stream::Idle(stream) => stream,
_ => {
res = Err(Error::from(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
"Message not idle, cannot start new outgoing"))); "Message not idle, cannot start new outgoing")));
} return stream;
},
}; };
let mut stream = BufWriter::new(stream); let mut stream = BufWriter::new(stream);
@@ -94,23 +151,37 @@ impl HttpMessage for Http11Message {
let version = version::HttpVersion::Http11; let version = version::HttpVersion::Http11;
debug!("request line: {:?} {:?} {:?}", head.method, uri, version); debug!("request line: {:?} {:?} {:?}", head.method, uri, version);
try!(write!(&mut stream, "{} {} {}{}", match write!(&mut stream, "{} {} {}{}",
head.method, uri, version, LINE_ENDING)); head.method, uri, version, LINE_ENDING) {
Err(e) => {
res = Err(From::from(e));
// TODO What should we do if the BufWriter doesn't wanna
// relinquish the stream?
return Stream::Idle(stream.into_inner().ok().unwrap());
},
Ok(_) => {},
};
let stream = { let stream = {
let mut write_headers = |mut stream: BufWriter<Box<NetworkStream + Send>>, head: &RequestHead| { let write_headers = |mut stream: BufWriter<Box<NetworkStream + Send>>, head: &RequestHead| {
debug!("headers={:?}", head.headers); debug!("headers={:?}", head.headers);
match write!(&mut stream, "{}{}", head.headers, LINE_ENDING) { match write!(&mut stream, "{}{}", head.headers, LINE_ENDING) {
Ok(_) => Ok(stream), Ok(_) => Ok(stream),
Err(e) => { Err(e) => {
self.stream = Some(stream.into_inner().unwrap()); Err((e, stream.into_inner().unwrap()))
Err(e)
} }
} }
}; };
match &head.method { match &head.method {
&Method::Get | &Method::Head => { &Method::Get | &Method::Head => {
EmptyWriter(try!(write_headers(stream, &head))) let writer = match write_headers(stream, &head) {
Ok(w) => w,
Err(e) => {
res = Err(From::from(e.0));
return Stream::Idle(e.1);
}
};
EmptyWriter(writer)
}, },
_ => { _ => {
let mut chunked = true; let mut chunked = true;
@@ -141,7 +212,13 @@ impl HttpMessage for Http11Message {
} }
} }
let stream = try!(write_headers(stream, &head)); let stream = match write_headers(stream, &head) {
Ok(s) => s,
Err(e) => {
res = Err(From::from(e.0));
return Stream::Idle(e.1);
},
};
if chunked { if chunked {
ChunkedWriter(stream) ChunkedWriter(stream)
@@ -152,22 +229,31 @@ impl HttpMessage for Http11Message {
} }
}; };
self.writer = Some(stream); method = Some(head.method.clone());
self.method = Some(head.method.clone()); res = Ok(head);
Stream::Writing(stream)
});
Ok(head) self.method = method;
res
} }
fn get_incoming(&mut self) -> ::Result<ResponseHead> { fn get_incoming(&mut self) -> ::Result<ResponseHead> {
try!(self.flush_outgoing()); try!(self.flush_outgoing());
let stream = match self.stream.take() { let method = self.method.take().unwrap_or(Method::Get);
Some(stream) => stream, let mut res = Err(From::from(
None => {
// The message was already in the reading state...
// TODO Decide what happens in case we try to get a new incoming at that point
return Err(From::from(
io::Error::new(io::ErrorKind::Other, io::Error::new(io::ErrorKind::Other,
"Read already in progress"))); "Read already in progress")));
self.stream.map_in_place(|stream| {
let stream = match stream {
Stream::Idle(stream) => stream,
_ => {
// The message was already in the reading state...
// TODO Decide what happens in case we try to get a new incoming at that point
res = Err(From::from(
io::Error::new(io::ErrorKind::Other,
"Read already in progress")));
return stream;
} }
}; };
@@ -189,8 +275,8 @@ impl HttpMessage for Http11Message {
continue; continue;
} }
Err(e) => { Err(e) => {
self.stream = Some(stream.into_inner()); res = Err(e);
return Err(e); return Stream::Idle(stream.into_inner());
} }
}; };
break; break;
@@ -199,8 +285,6 @@ impl HttpMessage for Http11Message {
let raw_status = head.subject; let raw_status = head.subject;
let headers = head.headers; let headers = head.headers;
let method = self.method.take().unwrap_or(Method::Get);
let is_empty = !should_have_response_body(&method, raw_status.0); let is_empty = !should_have_response_body(&method, raw_status.0);
stream.get_mut().set_previous_response_expected_no_content(is_empty); stream.get_mut().set_previous_response_expected_no_content(is_empty);
// According to https://tools.ietf.org/html/rfc7230#section-3.3.3 // According to https://tools.ietf.org/html/rfc7230#section-3.3.3
@@ -211,7 +295,7 @@ impl HttpMessage for Http11Message {
// 5. Content-Length header has a sized body. // 5. Content-Length header has a sized body.
// 6. Not Client. // 6. Not Client.
// 7. Read till EOF. // 7. Read till EOF.
self.reader = Some(if is_empty { let reader = if is_empty {
EmptyReader(stream) EmptyReader(stream)
} else { } else {
if let Some(&TransferEncoding(ref codings)) = headers.get() { if let Some(&TransferEncoding(ref codings)) = headers.get() {
@@ -225,29 +309,33 @@ impl HttpMessage for Http11Message {
SizedReader(stream, len) SizedReader(stream, len)
} else if headers.has::<ContentLength>() { } else if headers.has::<ContentLength>() {
trace!("illegal Content-Length: {:?}", headers.get_raw("Content-Length")); trace!("illegal Content-Length: {:?}", headers.get_raw("Content-Length"));
self.stream = Some(stream.into_inner()); res = Err(Error::Header);
return Err(Error::Header); return Stream::Idle(stream.into_inner());
} else { } else {
trace!("neither Transfer-Encoding nor Content-Length"); trace!("neither Transfer-Encoding nor Content-Length");
EofReader(stream) EofReader(stream)
} }
}); };
trace!("Http11Message.reader = {:?}", self.reader); trace!("Http11Message.reader = {:?}", reader);
Ok(ResponseHead { res = Ok(ResponseHead {
headers: headers, headers: headers,
raw_status: raw_status, raw_status: raw_status,
version: head.version, version: head.version,
}) });
Stream::Reading(reader)
});
res
} }
fn has_body(&self) -> bool { fn has_body(&self) -> bool {
match self.reader { match self.stream.as_ref().reader_ref() {
Some(EmptyReader(..)) | Some(&EmptyReader(..)) |
Some(SizedReader(_, 0)) | Some(&SizedReader(_, 0)) |
Some(ChunkedReader(_, Some(0))) => false, Some(&ChunkedReader(_, Some(0))) => false,
// specifically EofReader is always true // specifically EofReader is always true
_ => true _ => true
} }
@@ -274,43 +362,31 @@ impl HttpMessage for Http11Message {
impl Http11Message { impl Http11Message {
/// Consumes the `Http11Message` and returns the underlying `NetworkStream`. /// Consumes the `Http11Message` and returns the underlying `NetworkStream`.
pub fn into_inner(mut self) -> Box<NetworkStream + Send> { pub fn into_inner(self) -> Box<NetworkStream + Send> {
if self.stream.is_some() { match self.stream.into_inner() {
self.stream.take().unwrap() Stream::Idle(stream) => stream,
} else if self.writer.is_some() { Stream::Writing(stream) => stream.into_inner().into_inner().unwrap(),
self.writer.take().unwrap().into_inner().into_inner().unwrap() Stream::Reading(stream) => stream.into_inner().into_inner(),
} else if self.reader.is_some() {
self.reader.take().unwrap().into_inner().into_inner()
} else {
panic!("Http11Message lost its underlying stream somehow");
} }
} }
/// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the /// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the
/// `Http11Message`. /// `Http11Message`.
pub fn get_ref(&self) -> &(NetworkStream + Send) { pub fn get_ref(&self) -> &(NetworkStream + Send) {
if self.stream.is_some() { match *self.stream.as_ref() {
&**self.stream.as_ref().unwrap() Stream::Idle(ref stream) => &**stream,
} else if self.writer.is_some() { Stream::Writing(ref stream) => &**stream.get_ref().get_ref(),
&**self.writer.as_ref().unwrap().get_ref().get_ref() Stream::Reading(ref stream) => &**stream.get_ref().get_ref()
} else if self.reader.is_some() {
&**self.reader.as_ref().unwrap().get_ref().get_ref()
} else {
panic!("Http11Message lost its underlying stream somehow");
} }
} }
/// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the /// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the
/// `Http11Message`. /// `Http11Message`.
pub fn get_mut(&mut self) -> &mut (NetworkStream + Send) { pub fn get_mut(&mut self) -> &mut (NetworkStream + Send) {
if self.stream.is_some() { match *self.stream.as_mut() {
&mut **self.stream.as_mut().unwrap() Stream::Idle(ref mut stream) => &mut **stream,
} else if self.writer.is_some() { Stream::Writing(ref mut stream) => &mut **stream.get_mut().get_mut(),
&mut **self.writer.as_mut().unwrap().get_mut().get_mut() Stream::Reading(ref mut stream) => &mut **stream.get_mut().get_mut()
} else if self.reader.is_some() {
&mut **self.reader.as_mut().unwrap().get_mut().get_mut()
} else {
panic!("Http11Message lost its underlying stream somehow");
} }
} }
@@ -319,9 +395,7 @@ impl Http11Message {
pub fn with_stream(stream: Box<NetworkStream + Send>) -> Http11Message { pub fn with_stream(stream: Box<NetworkStream + Send>) -> Http11Message {
Http11Message { Http11Message {
method: None, method: None,
stream: Some(stream), stream: Wrapper::new(Stream::new(stream)),
writer: None,
reader: None,
} }
} }
@@ -329,22 +403,26 @@ impl Http11Message {
/// ///
/// TODO It might be sensible to lift this up to the `HttpMessage` trait itself... /// TODO It might be sensible to lift this up to the `HttpMessage` trait itself...
pub fn flush_outgoing(&mut self) -> ::Result<()> { pub fn flush_outgoing(&mut self) -> ::Result<()> {
match self.writer { let mut res = Ok(());
None => return Ok(()), self.stream.map_in_place(|stream| {
Some(_) => {}, let writer = match stream {
Stream::Writing(writer) => writer,
_ => {
res = Ok(());
return stream;
},
}; };
let writer = self.writer.take().unwrap();
// end() already flushes // end() already flushes
let raw = match writer.end() { let raw = match writer.end() {
Ok(buf) => buf.into_inner().unwrap(), Ok(buf) => buf.into_inner().unwrap(),
Err(e) => { Err(e) => {
self.writer = Some(e.1); res = Err(From::from(e.0));
return Err(From::from(e.0)); return Stream::Writing(e.1);
} }
}; };
self.stream = Some(raw); Stream::Idle(raw)
Ok(()) });
res
} }
} }