Process response

This commit is contained in:
Carl Lerche
2017-06-27 01:34:37 -07:00
parent 7897b770e9
commit 1f85d54cff
8 changed files with 112 additions and 13 deletions

View File

@@ -96,8 +96,8 @@ impl Peer for Client {
frame
}
fn convert_poll_message(headers: frame::Headers) -> Frame<Self::Poll> {
unimplemented!();
fn convert_poll_message(headers: frame::Headers) -> Self::Poll {
headers.into_response()
}
}

View File

@@ -1,4 +1,4 @@
use frame::{util, Head, Error, StreamId, Kind};
use frame::{util, Frame, Head, Error, StreamId, Kind};
use bytes::{BufMut, Bytes};
#[derive(Debug)]
@@ -35,10 +35,18 @@ impl Data {
})
}
pub fn stream_id(&self) -> StreamId {
self.stream_id
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_end_stream(&self) -> bool {
self.flags.is_end_stream()
}
pub fn encode<T: BufMut>(&self, dst: &mut T) {
self.head().encode(self.len(), dst);
dst.put(&self.data);
@@ -53,6 +61,13 @@ impl Data {
}
}
impl From<Data> for Frame {
fn from(src: Data) -> Frame {
Frame::Data(src)
}
}
// ===== impl DataFlag =====
impl DataFlag {
pub fn load(bits: u8) -> DataFlag {

View File

@@ -4,7 +4,7 @@ use error::Reason;
use frame::{self, Frame, Head, Kind, Error};
use util::byte_str::ByteStr;
use http::{Method, StatusCode};
use http::{request, response, Method, StatusCode};
use http::header::{self, HeaderMap, HeaderName, HeaderValue};
use bytes::{BytesMut, Bytes};
@@ -172,14 +172,35 @@ impl Headers {
})
}
pub fn stream_id(&self) -> StreamId {
self.stream_id
}
pub fn is_end_headers(&self) -> bool {
self.flags.is_end_headers()
}
pub fn is_end_stream(&self) -> bool {
self.flags.is_end_stream()
}
pub fn set_end_stream(&mut self) {
self.flags.set_end_stream()
}
pub fn into_response(self) -> response::Head {
let mut response = response::Head::default();
if let Some(status) = self.pseudo.status {
response.status = status;
} else {
unimplemented!();
}
response.headers = self.fields;
response
}
pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut)
-> Option<Continuation>
{

View File

@@ -31,6 +31,7 @@ mod data;
mod go_away;
mod head;
mod headers;
mod reset;
mod settings;
mod util;
@@ -38,6 +39,7 @@ pub use self::data::Data;
pub use self::go_away::GoAway;
pub use self::head::{Head, Kind, StreamId};
pub use self::headers::{Headers, PushPromise, Continuation, Pseudo};
pub use self::reset::Reset;
pub use self::settings::{Settings, SettingSet};
// Re-export some constants

25
src/frame/reset.rs Normal file
View File

@@ -0,0 +1,25 @@
use frame::{Head, Error};
use super::{head, StreamId};
#[derive(Debug)]
pub struct Reset {
stream_id: StreamId,
error_code: u32,
}
impl Reset {
pub fn load(head: Head, payload: &[u8]) -> Result<Reset, Error> {
if payload.len() != 4 {
// Invalid payload len
// TODO: Handle error
unimplemented!();
}
let error_code = unpack_octets_4!(payload, 0, u32);
Ok(Reset {
stream_id: head.stream_id(),
error_code: error_code,
})
}
}

View File

@@ -36,6 +36,8 @@ pub use error::{ConnectionError, StreamError, Reason};
pub use frame::{StreamId};
pub use proto::Connection;
use bytes::Bytes;
/// An H2 connection frame
#[derive(Debug)]
pub enum Frame<T> {
@@ -46,7 +48,7 @@ pub enum Frame<T> {
},
Body {
id: StreamId,
chunk: (),
chunk: Bytes,
end_of_stream: bool,
},
Trailers {
@@ -80,5 +82,5 @@ pub trait Peer {
end_of_stream: bool) -> frame::Headers;
#[doc(hidden)]
fn convert_poll_message(headers: frame::Headers) -> Frame<Self::Poll>;
fn convert_poll_message(headers: frame::Headers) -> Self::Poll;
}

View File

@@ -79,15 +79,36 @@ impl<T, P> Stream for Connection<T, P>
// buffer is clear, `poll_complete` is called here.
let _ = try!(self.poll_complete());
match try_ready!(self.inner.poll()) {
let frame = match try_ready!(self.inner.poll()) {
Some(Headers(v)) => {
debug!("poll; frame={:?}", v);
unimplemented!();
// TODO: Update stream state
let stream_id = v.stream_id();
let end_of_stream = v.is_end_stream();
Frame::Headers {
id: stream_id,
headers: P::convert_poll_message(v),
end_of_stream: end_of_stream,
}
}
Some(Data(v)) => {
// TODO: Validate frame
let stream_id = v.stream_id();
let end_of_stream = v.is_end_stream();
Frame::Body {
id: stream_id,
chunk: v.into_payload(),
end_of_stream: end_of_stream,
}
}
Some(frame) => panic!("unexpected frame; frame={:?}", frame),
None => return Ok(Async::Ready(None)),
_ => unimplemented!(),
}
};
Ok(Async::Ready(Some(frame)))
}
}

View File

@@ -49,7 +49,11 @@ impl<T> FramedRead<T> {
}
let frame = match head.kind() {
Kind::Data => unimplemented!(),
Kind::Data => {
let _ = bytes.split_to(frame::HEADER_LEN);
let frame = try!(frame::Data::load(head, bytes));
frame.into()
}
Kind::Headers => {
let mut buf = Cursor::new(bytes);
buf.set_position(frame::HEADER_LEN as u64);
@@ -66,12 +70,21 @@ impl<T> FramedRead<T> {
frame.into()
}
Kind::Priority => unimplemented!(),
Kind::Reset => unimplemented!(),
Kind::Reset => {
let frame = try!(frame::Reset::load(head, &bytes[frame::HEADER_LEN..]));
debug!("decoded; frame={:?}", frame);
// TODO: implement
return Ok(None);
}
Kind::Settings => {
let frame = try!(frame::Settings::load(head, &bytes[frame::HEADER_LEN..]));
frame.into()
}
Kind::PushPromise => unimplemented!(),
Kind::PushPromise => {
debug!("received PUSH_PROMISE");
// TODO: implement
return Ok(None);
}
Kind::Ping => unimplemented!(),
Kind::GoAway => {
let frame = try!(frame::GoAway::load(&bytes[frame::HEADER_LEN..]));