fix(http): Add a stream enum that makes it impossible to lose a stream
This removes a number of possible panics...
This commit is contained in:
466
src/http/h1.rs
466
src/http/h1.rs
@@ -35,19 +35,70 @@ use version;
|
|||||||
|
|
||||||
const MAX_INVALID_RESPONSE_BYTES: usize = 1024 * 128;
|
const MAX_INVALID_RESPONSE_BYTES: usize = 1024 * 128;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Wrapper<T> {
|
||||||
|
obj: Option<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Wrapper<T> {
|
||||||
|
pub fn new(obj: T) -> Wrapper<T> {
|
||||||
|
Wrapper { obj: Some(obj) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn map_in_place<F>(&mut self, f: F) where F: FnOnce(T) -> T {
|
||||||
|
let obj = self.obj.take().unwrap();
|
||||||
|
let res = f(obj);
|
||||||
|
self.obj = Some(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_inner(self) -> T { self.obj.unwrap() }
|
||||||
|
pub fn as_mut(&mut self) -> &mut T { self.obj.as_mut().unwrap() }
|
||||||
|
pub fn as_ref(&self) -> &T { self.obj.as_ref().unwrap() }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum Stream {
|
||||||
|
Idle(Box<NetworkStream + Send>),
|
||||||
|
Writing(HttpWriter<BufWriter<Box<NetworkStream + Send>>>),
|
||||||
|
Reading(HttpReader<BufReader<Box<NetworkStream + Send>>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream {
|
||||||
|
fn writer_mut(&mut self) -> Option<&mut HttpWriter<BufWriter<Box<NetworkStream + Send>>>> {
|
||||||
|
match *self {
|
||||||
|
Stream::Writing(ref mut writer) => Some(writer),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn reader_mut(&mut self) -> Option<&mut HttpReader<BufReader<Box<NetworkStream + Send>>>> {
|
||||||
|
match *self {
|
||||||
|
Stream::Reading(ref mut reader) => Some(reader),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn reader_ref(&self) -> Option<&HttpReader<BufReader<Box<NetworkStream + Send>>>> {
|
||||||
|
match *self {
|
||||||
|
Stream::Reading(ref reader) => Some(reader),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new(stream: Box<NetworkStream + Send>) -> Stream {
|
||||||
|
Stream::Idle(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// An implementation of the `HttpMessage` trait for HTTP/1.1.
|
/// An implementation of the `HttpMessage` trait for HTTP/1.1.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Http11Message {
|
pub struct Http11Message {
|
||||||
method: Option<Method>,
|
method: Option<Method>,
|
||||||
stream: Option<Box<NetworkStream + Send>>,
|
stream: Wrapper<Stream>,
|
||||||
writer: Option<HttpWriter<BufWriter<Box<NetworkStream + Send>>>>,
|
|
||||||
reader: Option<HttpReader<BufReader<Box<NetworkStream + Send>>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Write for Http11Message {
|
impl Write for Http11Message {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
match self.writer {
|
match self.stream.as_mut().writer_mut() {
|
||||||
None => Err(io::Error::new(io::ErrorKind::Other,
|
None => Err(io::Error::new(io::ErrorKind::Other,
|
||||||
"Not in a writable state")),
|
"Not in a writable state")),
|
||||||
Some(ref mut writer) => writer.write(buf),
|
Some(ref mut writer) => writer.write(buf),
|
||||||
@@ -55,7 +106,7 @@ impl Write for Http11Message {
|
|||||||
}
|
}
|
||||||
#[inline]
|
#[inline]
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
match self.writer {
|
match self.stream.as_mut().writer_mut() {
|
||||||
None => Err(io::Error::new(io::ErrorKind::Other,
|
None => Err(io::Error::new(io::ErrorKind::Other,
|
||||||
"Not in a writable state")),
|
"Not in a writable state")),
|
||||||
Some(ref mut writer) => writer.flush(),
|
Some(ref mut writer) => writer.flush(),
|
||||||
@@ -66,7 +117,7 @@ impl Write for Http11Message {
|
|||||||
impl Read for Http11Message {
|
impl Read for Http11Message {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
match self.reader {
|
match self.stream.as_mut().reader_mut() {
|
||||||
None => Err(io::Error::new(io::ErrorKind::Other,
|
None => Err(io::Error::new(io::ErrorKind::Other,
|
||||||
"Not in a readable state")),
|
"Not in a readable state")),
|
||||||
Some(ref mut reader) => reader.read(buf),
|
Some(ref mut reader) => reader.read(buf),
|
||||||
@@ -76,178 +127,215 @@ impl Read for Http11Message {
|
|||||||
|
|
||||||
impl HttpMessage for Http11Message {
|
impl HttpMessage for Http11Message {
|
||||||
fn set_outgoing(&mut self, mut head: RequestHead) -> ::Result<RequestHead> {
|
fn set_outgoing(&mut self, mut head: RequestHead) -> ::Result<RequestHead> {
|
||||||
let stream = match self.stream.take() {
|
let mut res = Err(Error::from(io::Error::new(
|
||||||
Some(stream) => stream,
|
|
||||||
None => {
|
|
||||||
return Err(From::from(io::Error::new(
|
|
||||||
io::ErrorKind::Other,
|
io::ErrorKind::Other,
|
||||||
"Message not idle, cannot start new outgoing")));
|
"")));
|
||||||
|
let mut method = None;
|
||||||
|
self.stream.map_in_place(|stream: Stream| -> Stream {
|
||||||
|
let stream = match stream {
|
||||||
|
Stream::Idle(stream) => stream,
|
||||||
|
_ => {
|
||||||
|
res = Err(Error::from(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
"Message not idle, cannot start new outgoing")));
|
||||||
|
return stream;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
let mut stream = BufWriter::new(stream);
|
||||||
|
|
||||||
|
let mut uri = head.url.serialize_path().unwrap();
|
||||||
|
if let Some(ref q) = head.url.query {
|
||||||
|
uri.push('?');
|
||||||
|
uri.push_str(&q[..]);
|
||||||
}
|
}
|
||||||
};
|
|
||||||
let mut stream = BufWriter::new(stream);
|
|
||||||
|
|
||||||
let mut uri = head.url.serialize_path().unwrap();
|
let version = version::HttpVersion::Http11;
|
||||||
if let Some(ref q) = head.url.query {
|
debug!("request line: {:?} {:?} {:?}", head.method, uri, version);
|
||||||
uri.push('?');
|
match write!(&mut stream, "{} {} {}{}",
|
||||||
uri.push_str(&q[..]);
|
head.method, uri, version, LINE_ENDING) {
|
||||||
}
|
Err(e) => {
|
||||||
|
res = Err(From::from(e));
|
||||||
|
// TODO What should we do if the BufWriter doesn't wanna
|
||||||
|
// relinquish the stream?
|
||||||
|
return Stream::Idle(stream.into_inner().ok().unwrap());
|
||||||
|
},
|
||||||
|
Ok(_) => {},
|
||||||
|
};
|
||||||
|
|
||||||
let version = version::HttpVersion::Http11;
|
let stream = {
|
||||||
debug!("request line: {:?} {:?} {:?}", head.method, uri, version);
|
let write_headers = |mut stream: BufWriter<Box<NetworkStream + Send>>, head: &RequestHead| {
|
||||||
try!(write!(&mut stream, "{} {} {}{}",
|
debug!("headers={:?}", head.headers);
|
||||||
head.method, uri, version, LINE_ENDING));
|
match write!(&mut stream, "{}{}", head.headers, LINE_ENDING) {
|
||||||
|
Ok(_) => Ok(stream),
|
||||||
|
Err(e) => {
|
||||||
|
Err((e, stream.into_inner().unwrap()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
match &head.method {
|
||||||
|
&Method::Get | &Method::Head => {
|
||||||
|
let writer = match write_headers(stream, &head) {
|
||||||
|
Ok(w) => w,
|
||||||
|
Err(e) => {
|
||||||
|
res = Err(From::from(e.0));
|
||||||
|
return Stream::Idle(e.1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
EmptyWriter(writer)
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
let mut chunked = true;
|
||||||
|
let mut len = 0;
|
||||||
|
|
||||||
let stream = {
|
match head.headers.get::<header::ContentLength>() {
|
||||||
let mut write_headers = |mut stream: BufWriter<Box<NetworkStream + Send>>, head: &RequestHead| {
|
Some(cl) => {
|
||||||
debug!("headers={:?}", head.headers);
|
chunked = false;
|
||||||
match write!(&mut stream, "{}{}", head.headers, LINE_ENDING) {
|
len = **cl;
|
||||||
Ok(_) => Ok(stream),
|
},
|
||||||
Err(e) => {
|
None => ()
|
||||||
self.stream = Some(stream.into_inner().unwrap());
|
};
|
||||||
Err(e)
|
|
||||||
|
// can't do in match above, thanks borrowck
|
||||||
|
if chunked {
|
||||||
|
let encodings = match head.headers.get_mut::<header::TransferEncoding>() {
|
||||||
|
Some(encodings) => {
|
||||||
|
//TODO: check if chunked is already in encodings. use HashSet?
|
||||||
|
encodings.push(header::Encoding::Chunked);
|
||||||
|
false
|
||||||
|
},
|
||||||
|
None => true
|
||||||
|
};
|
||||||
|
|
||||||
|
if encodings {
|
||||||
|
head.headers.set(
|
||||||
|
header::TransferEncoding(vec![header::Encoding::Chunked]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let stream = match write_headers(stream, &head) {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
res = Err(From::from(e.0));
|
||||||
|
return Stream::Idle(e.1);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
if chunked {
|
||||||
|
ChunkedWriter(stream)
|
||||||
|
} else {
|
||||||
|
SizedWriter(stream, len)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
match &head.method {
|
|
||||||
&Method::Get | &Method::Head => {
|
|
||||||
EmptyWriter(try!(write_headers(stream, &head)))
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
let mut chunked = true;
|
|
||||||
let mut len = 0;
|
|
||||||
|
|
||||||
match head.headers.get::<header::ContentLength>() {
|
method = Some(head.method.clone());
|
||||||
Some(cl) => {
|
res = Ok(head);
|
||||||
chunked = false;
|
Stream::Writing(stream)
|
||||||
len = **cl;
|
});
|
||||||
},
|
|
||||||
None => ()
|
|
||||||
};
|
|
||||||
|
|
||||||
// can't do in match above, thanks borrowck
|
self.method = method;
|
||||||
if chunked {
|
res
|
||||||
let encodings = match head.headers.get_mut::<header::TransferEncoding>() {
|
|
||||||
Some(encodings) => {
|
|
||||||
//TODO: check if chunked is already in encodings. use HashSet?
|
|
||||||
encodings.push(header::Encoding::Chunked);
|
|
||||||
false
|
|
||||||
},
|
|
||||||
None => true
|
|
||||||
};
|
|
||||||
|
|
||||||
if encodings {
|
|
||||||
head.headers.set(
|
|
||||||
header::TransferEncoding(vec![header::Encoding::Chunked]))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let stream = try!(write_headers(stream, &head));
|
|
||||||
|
|
||||||
if chunked {
|
|
||||||
ChunkedWriter(stream)
|
|
||||||
} else {
|
|
||||||
SizedWriter(stream, len)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
self.writer = Some(stream);
|
|
||||||
self.method = Some(head.method.clone());
|
|
||||||
|
|
||||||
Ok(head)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_incoming(&mut self) -> ::Result<ResponseHead> {
|
fn get_incoming(&mut self) -> ::Result<ResponseHead> {
|
||||||
try!(self.flush_outgoing());
|
try!(self.flush_outgoing());
|
||||||
let stream = match self.stream.take() {
|
let method = self.method.take().unwrap_or(Method::Get);
|
||||||
Some(stream) => stream,
|
let mut res = Err(From::from(
|
||||||
None => {
|
|
||||||
// The message was already in the reading state...
|
|
||||||
// TODO Decide what happens in case we try to get a new incoming at that point
|
|
||||||
return Err(From::from(
|
|
||||||
io::Error::new(io::ErrorKind::Other,
|
io::Error::new(io::ErrorKind::Other,
|
||||||
"Read already in progress")));
|
"Read already in progress")));
|
||||||
}
|
self.stream.map_in_place(|stream| {
|
||||||
};
|
let stream = match stream {
|
||||||
|
Stream::Idle(stream) => stream,
|
||||||
let expected_no_content = stream.previous_response_expected_no_content();
|
_ => {
|
||||||
trace!("previous_response_expected_no_content = {}", expected_no_content);
|
// The message was already in the reading state...
|
||||||
|
// TODO Decide what happens in case we try to get a new incoming at that point
|
||||||
let mut stream = BufReader::new(stream);
|
res = Err(From::from(
|
||||||
|
io::Error::new(io::ErrorKind::Other,
|
||||||
let mut invalid_bytes_read = 0;
|
"Read already in progress")));
|
||||||
let head;
|
return stream;
|
||||||
loop {
|
|
||||||
head = match parse_response(&mut stream) {
|
|
||||||
Ok(head) => head,
|
|
||||||
Err(::Error::Version)
|
|
||||||
if expected_no_content && invalid_bytes_read < MAX_INVALID_RESPONSE_BYTES => {
|
|
||||||
trace!("expected_no_content, found content");
|
|
||||||
invalid_bytes_read += 1;
|
|
||||||
stream.consume(1);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
self.stream = Some(stream.into_inner());
|
|
||||||
return Err(e);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let raw_status = head.subject;
|
let expected_no_content = stream.previous_response_expected_no_content();
|
||||||
let headers = head.headers;
|
trace!("previous_response_expected_no_content = {}", expected_no_content);
|
||||||
|
|
||||||
let method = self.method.take().unwrap_or(Method::Get);
|
let mut stream = BufReader::new(stream);
|
||||||
|
|
||||||
let is_empty = !should_have_response_body(&method, raw_status.0);
|
let mut invalid_bytes_read = 0;
|
||||||
stream.get_mut().set_previous_response_expected_no_content(is_empty);
|
let head;
|
||||||
// According to https://tools.ietf.org/html/rfc7230#section-3.3.3
|
loop {
|
||||||
// 1. HEAD reponses, and Status 1xx, 204, and 304 cannot have a body.
|
head = match parse_response(&mut stream) {
|
||||||
// 2. Status 2xx to a CONNECT cannot have a body.
|
Ok(head) => head,
|
||||||
// 3. Transfer-Encoding: chunked has a chunked body.
|
Err(::Error::Version)
|
||||||
// 4. If multiple differing Content-Length headers or invalid, close connection.
|
if expected_no_content && invalid_bytes_read < MAX_INVALID_RESPONSE_BYTES => {
|
||||||
// 5. Content-Length header has a sized body.
|
trace!("expected_no_content, found content");
|
||||||
// 6. Not Client.
|
invalid_bytes_read += 1;
|
||||||
// 7. Read till EOF.
|
stream.consume(1);
|
||||||
self.reader = Some(if is_empty {
|
continue;
|
||||||
EmptyReader(stream)
|
}
|
||||||
} else {
|
Err(e) => {
|
||||||
if let Some(&TransferEncoding(ref codings)) = headers.get() {
|
res = Err(e);
|
||||||
if codings.last() == Some(&Chunked) {
|
return Stream::Idle(stream.into_inner());
|
||||||
ChunkedReader(stream, None)
|
}
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let raw_status = head.subject;
|
||||||
|
let headers = head.headers;
|
||||||
|
|
||||||
|
let is_empty = !should_have_response_body(&method, raw_status.0);
|
||||||
|
stream.get_mut().set_previous_response_expected_no_content(is_empty);
|
||||||
|
// According to https://tools.ietf.org/html/rfc7230#section-3.3.3
|
||||||
|
// 1. HEAD reponses, and Status 1xx, 204, and 304 cannot have a body.
|
||||||
|
// 2. Status 2xx to a CONNECT cannot have a body.
|
||||||
|
// 3. Transfer-Encoding: chunked has a chunked body.
|
||||||
|
// 4. If multiple differing Content-Length headers or invalid, close connection.
|
||||||
|
// 5. Content-Length header has a sized body.
|
||||||
|
// 6. Not Client.
|
||||||
|
// 7. Read till EOF.
|
||||||
|
let reader = if is_empty {
|
||||||
|
EmptyReader(stream)
|
||||||
|
} else {
|
||||||
|
if let Some(&TransferEncoding(ref codings)) = headers.get() {
|
||||||
|
if codings.last() == Some(&Chunked) {
|
||||||
|
ChunkedReader(stream, None)
|
||||||
|
} else {
|
||||||
|
trace!("not chuncked. read till eof");
|
||||||
|
EofReader(stream)
|
||||||
|
}
|
||||||
|
} else if let Some(&ContentLength(len)) = headers.get() {
|
||||||
|
SizedReader(stream, len)
|
||||||
|
} else if headers.has::<ContentLength>() {
|
||||||
|
trace!("illegal Content-Length: {:?}", headers.get_raw("Content-Length"));
|
||||||
|
res = Err(Error::Header);
|
||||||
|
return Stream::Idle(stream.into_inner());
|
||||||
} else {
|
} else {
|
||||||
trace!("not chuncked. read till eof");
|
trace!("neither Transfer-Encoding nor Content-Length");
|
||||||
EofReader(stream)
|
EofReader(stream)
|
||||||
}
|
}
|
||||||
} else if let Some(&ContentLength(len)) = headers.get() {
|
};
|
||||||
SizedReader(stream, len)
|
|
||||||
} else if headers.has::<ContentLength>() {
|
trace!("Http11Message.reader = {:?}", reader);
|
||||||
trace!("illegal Content-Length: {:?}", headers.get_raw("Content-Length"));
|
|
||||||
self.stream = Some(stream.into_inner());
|
|
||||||
return Err(Error::Header);
|
res = Ok(ResponseHead {
|
||||||
} else {
|
headers: headers,
|
||||||
trace!("neither Transfer-Encoding nor Content-Length");
|
raw_status: raw_status,
|
||||||
EofReader(stream)
|
version: head.version,
|
||||||
}
|
});
|
||||||
|
|
||||||
|
Stream::Reading(reader)
|
||||||
});
|
});
|
||||||
|
res
|
||||||
trace!("Http11Message.reader = {:?}", self.reader);
|
|
||||||
|
|
||||||
|
|
||||||
Ok(ResponseHead {
|
|
||||||
headers: headers,
|
|
||||||
raw_status: raw_status,
|
|
||||||
version: head.version,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn has_body(&self) -> bool {
|
fn has_body(&self) -> bool {
|
||||||
match self.reader {
|
match self.stream.as_ref().reader_ref() {
|
||||||
Some(EmptyReader(..)) |
|
Some(&EmptyReader(..)) |
|
||||||
Some(SizedReader(_, 0)) |
|
Some(&SizedReader(_, 0)) |
|
||||||
Some(ChunkedReader(_, Some(0))) => false,
|
Some(&ChunkedReader(_, Some(0))) => false,
|
||||||
// specifically EofReader is always true
|
// specifically EofReader is always true
|
||||||
_ => true
|
_ => true
|
||||||
}
|
}
|
||||||
@@ -274,43 +362,31 @@ impl HttpMessage for Http11Message {
|
|||||||
|
|
||||||
impl Http11Message {
|
impl Http11Message {
|
||||||
/// Consumes the `Http11Message` and returns the underlying `NetworkStream`.
|
/// Consumes the `Http11Message` and returns the underlying `NetworkStream`.
|
||||||
pub fn into_inner(mut self) -> Box<NetworkStream + Send> {
|
pub fn into_inner(self) -> Box<NetworkStream + Send> {
|
||||||
if self.stream.is_some() {
|
match self.stream.into_inner() {
|
||||||
self.stream.take().unwrap()
|
Stream::Idle(stream) => stream,
|
||||||
} else if self.writer.is_some() {
|
Stream::Writing(stream) => stream.into_inner().into_inner().unwrap(),
|
||||||
self.writer.take().unwrap().into_inner().into_inner().unwrap()
|
Stream::Reading(stream) => stream.into_inner().into_inner(),
|
||||||
} else if self.reader.is_some() {
|
|
||||||
self.reader.take().unwrap().into_inner().into_inner()
|
|
||||||
} else {
|
|
||||||
panic!("Http11Message lost its underlying stream somehow");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the
|
/// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the
|
||||||
/// `Http11Message`.
|
/// `Http11Message`.
|
||||||
pub fn get_ref(&self) -> &(NetworkStream + Send) {
|
pub fn get_ref(&self) -> &(NetworkStream + Send) {
|
||||||
if self.stream.is_some() {
|
match *self.stream.as_ref() {
|
||||||
&**self.stream.as_ref().unwrap()
|
Stream::Idle(ref stream) => &**stream,
|
||||||
} else if self.writer.is_some() {
|
Stream::Writing(ref stream) => &**stream.get_ref().get_ref(),
|
||||||
&**self.writer.as_ref().unwrap().get_ref().get_ref()
|
Stream::Reading(ref stream) => &**stream.get_ref().get_ref()
|
||||||
} else if self.reader.is_some() {
|
|
||||||
&**self.reader.as_ref().unwrap().get_ref().get_ref()
|
|
||||||
} else {
|
|
||||||
panic!("Http11Message lost its underlying stream somehow");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the
|
/// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the
|
||||||
/// `Http11Message`.
|
/// `Http11Message`.
|
||||||
pub fn get_mut(&mut self) -> &mut (NetworkStream + Send) {
|
pub fn get_mut(&mut self) -> &mut (NetworkStream + Send) {
|
||||||
if self.stream.is_some() {
|
match *self.stream.as_mut() {
|
||||||
&mut **self.stream.as_mut().unwrap()
|
Stream::Idle(ref mut stream) => &mut **stream,
|
||||||
} else if self.writer.is_some() {
|
Stream::Writing(ref mut stream) => &mut **stream.get_mut().get_mut(),
|
||||||
&mut **self.writer.as_mut().unwrap().get_mut().get_mut()
|
Stream::Reading(ref mut stream) => &mut **stream.get_mut().get_mut()
|
||||||
} else if self.reader.is_some() {
|
|
||||||
&mut **self.reader.as_mut().unwrap().get_mut().get_mut()
|
|
||||||
} else {
|
|
||||||
panic!("Http11Message lost its underlying stream somehow");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -319,9 +395,7 @@ impl Http11Message {
|
|||||||
pub fn with_stream(stream: Box<NetworkStream + Send>) -> Http11Message {
|
pub fn with_stream(stream: Box<NetworkStream + Send>) -> Http11Message {
|
||||||
Http11Message {
|
Http11Message {
|
||||||
method: None,
|
method: None,
|
||||||
stream: Some(stream),
|
stream: Wrapper::new(Stream::new(stream)),
|
||||||
writer: None,
|
|
||||||
reader: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -329,22 +403,26 @@ impl Http11Message {
|
|||||||
///
|
///
|
||||||
/// TODO It might be sensible to lift this up to the `HttpMessage` trait itself...
|
/// TODO It might be sensible to lift this up to the `HttpMessage` trait itself...
|
||||||
pub fn flush_outgoing(&mut self) -> ::Result<()> {
|
pub fn flush_outgoing(&mut self) -> ::Result<()> {
|
||||||
match self.writer {
|
let mut res = Ok(());
|
||||||
None => return Ok(()),
|
self.stream.map_in_place(|stream| {
|
||||||
Some(_) => {},
|
let writer = match stream {
|
||||||
};
|
Stream::Writing(writer) => writer,
|
||||||
|
_ => {
|
||||||
let writer = self.writer.take().unwrap();
|
res = Ok(());
|
||||||
// end() already flushes
|
return stream;
|
||||||
let raw = match writer.end() {
|
},
|
||||||
Ok(buf) => buf.into_inner().unwrap(),
|
};
|
||||||
Err(e) => {
|
// end() already flushes
|
||||||
self.writer = Some(e.1);
|
let raw = match writer.end() {
|
||||||
return Err(From::from(e.0));
|
Ok(buf) => buf.into_inner().unwrap(),
|
||||||
}
|
Err(e) => {
|
||||||
};
|
res = Err(From::from(e.0));
|
||||||
self.stream = Some(raw);
|
return Stream::Writing(e.1);
|
||||||
Ok(())
|
}
|
||||||
|
};
|
||||||
|
Stream::Idle(raw)
|
||||||
|
});
|
||||||
|
res
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user