More work

This commit is contained in:
Carl Lerche
2017-08-04 12:12:22 -07:00
parent 74b3852a58
commit fc0a7eb898
9 changed files with 264 additions and 106 deletions

View File

@@ -25,6 +25,7 @@ use error::Reason::*;
use error::User::*;
use http::{Request, Response};
use bytes::Bytes;
#[derive(Debug)]
pub struct Config {

View File

@@ -14,7 +14,9 @@ struct Indices {
tail: store::Key,
}
impl<B> Prioritize<B> {
impl<B> Prioritize<B>
where B: Buf,
{
pub fn new() -> Prioritize<B> {
Prioritize {
pending_send: None,
@@ -58,4 +60,34 @@ impl<B> Prioritize<B> {
stream.is_pending_send = true;
}
pub fn poll_complete<T>(&mut self,
store: &mut Store<B>,
dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
{
loop {
// Ensure codec is ready
try_ready!(dst.poll_ready());
match self.pop_frame(store) {
Some(frame) => {
// TODO: data frames should be handled specially...
let res = dst.start_send(frame)?;
// We already verified that `dst` is ready to accept the
// write
assert!(res.is_ready());
}
None => break,
}
}
Ok(().into())
}
fn pop_frame(&mut self, store: &mut Store<B>) -> Option<Frame<B>> {
unimplemented!();
}
}

View File

@@ -1,4 +1,4 @@
use {frame, ConnectionError};
use {client, frame, ConnectionError};
use proto::*;
use super::*;
@@ -23,6 +23,9 @@ pub(super) struct Recv<P, B> {
pending_window_updates: VecDeque<StreamId>,
/// Holds frames that are waiting to be read
buffer: Buffer<Bytes>,
/// Refused StreamId, this represents a frame that must be sent out.
refused: Option<StreamId>,
@@ -40,6 +43,7 @@ impl<P, B> Recv<P, B>
init_window_sz: config.init_remote_window_sz,
flow_control: FlowControl::new(config.init_remote_window_sz),
pending_window_updates: VecDeque::new(),
buffer: Buffer::new(),
refused: None,
_p: PhantomData,
}
@@ -67,10 +71,24 @@ impl<P, B> Recv<P, B>
}
/// Transition the stream state based on receiving headers
pub fn recv_headers(&mut self, stream: &mut Stream<B>, eos: bool)
-> Result<(), ConnectionError>
pub fn recv_headers(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B>)
-> Result<Option<frame::Headers>, ConnectionError>
{
stream.state.recv_open(self.init_window_sz, eos)
stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?;
// Only servers can receive a headers frame that initiates the stream.
// This is verified in `Streams` before calling this function.
if P::is_server() {
Ok(Some(frame))
} else {
// Push the frame onto the recv buffer
stream.pending_recv.push_back(&mut self.buffer, frame.into());
stream.notify_recv();
Ok(None)
}
}
pub fn recv_eos(&mut self, stream: &mut Stream<B>)
@@ -233,3 +251,25 @@ impl<P, B> Recv<P, B>
unimplemented!();
}
}
impl<B> Recv<client::Peer, B>
where B: Buf,
{
pub fn poll_response(&mut self, stream: &mut store::Ptr<B>)
-> Poll<Response<()>, ConnectionError> {
// If the buffer is not empty, then the first frame must be a HEADERS
// frame or the user violated the contract.
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Frame::Headers(v)) => {
// TODO: This error should probably be caught on receipt of the
// frame vs. now.
Ok(client::Peer::convert_poll_message(v)?.into())
}
Some(frame) => unimplemented!(),
None => {
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
}
}
}
}

View File

@@ -151,6 +151,15 @@ impl<P, B> Send<P, B>
Ok(())
}
pub fn poll_complete<T>(&mut self,
store: &mut Store<B>,
dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
{
self.prioritize.poll_complete(store, dst)
}
/// Get pending window updates
pub fn poll_window_update(&mut self, streams: &mut Store<B>)
-> Poll<WindowUpdate, ConnectionError>

View File

@@ -47,6 +47,13 @@ impl<B> Store<B> {
}
}
pub fn resolve(&mut self, key: Key) -> Ptr<B> {
Ptr {
key: key,
store: self,
}
}
pub fn find_mut(&mut self, id: &StreamId) -> Option<&mut Stream<B>> {
if let Some(handle) = self.ids.get(id) {
Some(&mut self.slab[*handle])
@@ -117,6 +124,10 @@ impl<'a, B: 'a> ops::DerefMut for Ptr<'a, B> {
// ===== impl OccupiedEntry =====
impl<'a, B> OccupiedEntry<'a, B> {
pub fn key(&self) -> Key {
Key(*self.ids.get())
}
pub fn get(&self) -> &Stream<B> {
&self.slab[*self.ids.get()]
}
@@ -133,13 +144,13 @@ impl<'a, B> OccupiedEntry<'a, B> {
// ===== impl VacantEntry =====
//
impl<'a, B> VacantEntry<'a, B> {
pub fn insert(self, value: Stream<B>) -> &'a mut Stream<B> {
pub fn insert(self, value: Stream<B>) -> Key {
// Insert the value in the slab
let handle = self.slab.insert(value);
let key = self.slab.insert(value);
// Insert the handle in the ID map
self.ids.insert(handle);
self.ids.insert(key);
&mut self.slab[handle]
Key(key)
}
}

View File

@@ -5,6 +5,12 @@ pub(super) struct Stream<B> {
/// Current state of the stream
pub state: State,
/// Frames pending for this stream to read
pub pending_recv: buffer::Deque<Bytes>,
/// Task tracking receiving frames
pub recv_task: Option<task::Task>,
/// Frames pending for this stream being sent to the socket
pub pending_send: buffer::Deque<B>,
@@ -19,6 +25,8 @@ impl<B> Stream<B> {
pub fn new() -> Stream<B> {
Stream {
state: State::default(),
pending_recv: buffer::Deque::new(),
recv_task: None,
pending_send: buffer::Deque::new(),
next_pending_send: None,
is_pending_send: false,
@@ -32,4 +40,10 @@ impl<B> Stream<B> {
pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> {
self.state.recv_flow_control()
}
pub fn notify_recv(&mut self) {
if let Some(ref mut task) = self.recv_task {
task.notify();
}
}
}

View File

@@ -15,7 +15,7 @@ pub struct Streams<P, B> {
#[derive(Debug)]
pub struct StreamRef<P, B> {
inner: Arc<Mutex<Inner<P, B>>>,
id: StreamId,
key: store::Key,
}
/// Fields needed to manage state related to managing the set of streams. This
@@ -53,6 +53,7 @@ impl<P, B> Streams<P, B>
}
}
/// Process inbound headers
pub fn recv_headers(&mut self, frame: frame::Headers)
-> Result<Option<frame::Headers>, ConnectionError>
{
@@ -60,8 +61,8 @@ impl<P, B> Streams<P, B>
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let stream = match me.store.find_entry(id) {
Entry::Occupied(e) => e.into_mut(),
let key = match me.store.find_entry(id) {
Entry::Occupied(e) => e.key(),
Entry::Vacant(e) => {
// Trailers cannot open a stream. Trailers are header frames
// that do not contain pseudo headers. Requests MUST contain a
@@ -78,22 +79,28 @@ impl<P, B> Streams<P, B>
}
};
if frame.is_trailers() {
let mut stream = me.store.resolve(key);
let ret = if frame.is_trailers() {
unimplemented!();
/*
if !frame.is_end_stream() {
// TODO: What error should this return?
unimplemented!();
}
try!(me.actions.recv.recv_eos(stream));
*/
} else {
try!(me.actions.recv.recv_headers(stream, frame.is_end_stream()));
}
try!(me.actions.recv.recv_headers(frame, &mut stream))
};
// TODO: move this into a fn
if stream.state.is_closed() {
me.actions.dec_num_streams(id);
}
Ok(Some(frame))
Ok(ret)
}
pub fn recv_data(&mut self, frame: &frame::Data)
@@ -241,16 +248,20 @@ impl<P, B> Streams<P, B>
me.actions.recv.send_pending_refusal(dst)
}
pub fn send_pending_window_updates<T>(&mut self, dst: &mut Codec<T, B>)
pub fn poll_complete<T>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
// TODO: sending window updates should be part of Prioritize
/*
try_ready!(me.actions.recv.send_connection_window_update(dst));
try_ready!(me.actions.recv.send_stream_window_update(&mut me.store, dst));
*/
Ok(().into())
me.actions.send.poll_complete(&mut me.store, dst)
}
}
@@ -260,7 +271,7 @@ impl<B> Streams<client::Peer, B>
pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool)
-> Result<StreamRef<client::Peer, B>, ConnectionError>
{
let id = {
let key = {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -279,16 +290,29 @@ impl<B> Streams<client::Peer, B>
// closed state.
debug_assert!(!stream.state.is_closed());
id
stream.key()
};
Ok(StreamRef {
inner: self.inner.clone(),
id: id,
key: key,
})
}
}
impl<B> StreamRef<client::Peer, B>
where B: Buf,
{
pub fn poll_response(&mut self) -> Poll<Response<()>, ConnectionError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.key);
me.actions.recv.poll_response(&mut stream)
}
}
impl<P, B> Actions<P, B>
where P: Peer,
B: Buf,