Much work

This commit is contained in:
Carl Lerche
2017-08-03 15:50:13 -07:00
parent 7a804601c5
commit dd8412d660
11 changed files with 211 additions and 104 deletions

View File

@@ -25,8 +25,7 @@ pub struct Client<T, B: IntoBuf> {
/// Client half of an active HTTP/2.0 stream. /// Client half of an active HTTP/2.0 stream.
pub struct Stream<B: IntoBuf> { pub struct Stream<B: IntoBuf> {
inner: proto::Stream<Peer>, inner: proto::StreamRef<Peer, B::Buf>,
_p: ::std::marker::PhantomData<B>,
} }
impl<T> Client<T, Bytes> impl<T> Client<T, Bytes>
@@ -86,7 +85,6 @@ impl<T, B> Client<T, B>
self.connection.send_request(request, end_of_stream) self.connection.send_request(request, end_of_stream)
.map(|stream| Stream { .map(|stream| Stream {
inner: stream, inner: stream,
_p: ::std::marker::PhantomData,
}) })
} }
} }

View File

@@ -20,6 +20,8 @@ extern crate fnv;
extern crate byteorder; extern crate byteorder;
extern crate slab;
#[macro_use] #[macro_use]
extern crate log; extern crate log;

View File

@@ -19,7 +19,7 @@ pub struct Connection<T, P, B: IntoBuf = Bytes> {
// TODO: Remove <B> // TODO: Remove <B>
ping_pong: PingPong<B::Buf>, ping_pong: PingPong<B::Buf>,
settings: Settings, settings: Settings,
streams: Streams<P>, streams: Streams<P, B::Buf>,
_phantom: PhantomData<P>, _phantom: PhantomData<P>,
} }
@@ -255,7 +255,7 @@ impl<T, B> Connection<T, client::Peer, B>
{ {
/// Initialize a new HTTP/2.0 stream and send the message. /// Initialize a new HTTP/2.0 stream and send the message.
pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool)
-> Result<Stream<client::Peer>, ConnectionError> -> Result<StreamRef<client::Peer, B::Buf>, ConnectionError>
{ {
self.streams.send_request(request, end_of_stream) self.streams.send_request(request, end_of_stream)
} }

View File

@@ -6,7 +6,7 @@ mod settings;
mod streams; mod streams;
pub use self::connection::Connection; pub use self::connection::Connection;
pub use self::streams::{Streams, Stream}; pub use self::streams::{Streams, StreamRef};
use self::framed_read::FramedRead; use self::framed_read::FramedRead;
use self::framed_write::FramedWrite; use self::framed_write::FramedWrite;

View File

@@ -0,0 +1,60 @@
use frame::{self, Frame};
use slab::Slab;
use std::marker::PhantomData;
/// Buffers frames for multiple streams.
#[derive(Debug)]
pub struct Buffer<B> {
slab: Slab<Slot<B>>,
}
/// A sequence of frames in a `Buffer`
#[derive(Debug)]
pub struct Deque<B> {
indices: Option<Indices>,
_p: PhantomData<B>,
}
/// Tracks the head & tail for a sequence of frames in a `Buffer`.
#[derive(Debug, Default)]
struct Indices {
head: usize,
tail: usize,
}
#[derive(Debug)]
struct Slot<B> {
frame: Frame<B>,
next: usize,
}
impl<B> Buffer<B> {
pub fn new() -> Self {
Buffer {
slab: Slab::new(),
}
}
}
impl<B> Deque<B> {
pub fn new() -> Self {
Deque {
indices: None,
_p: PhantomData,
}
}
pub fn is_empty(&self) -> bool {
self.indices.is_none()
}
pub fn push_back(&mut self, buf: &mut Buffer<B>, val: Frame<B>) {
unimplemented!();
}
pub fn pop_front(&mut self, buf: &mut Buffer<B>) -> Option<Frame<B>> {
unimplemented!();
}
}

View File

@@ -1,3 +1,4 @@
mod buffer;
mod flow_control; mod flow_control;
mod recv; mod recv;
mod send; mod send;
@@ -6,13 +7,15 @@ mod store;
mod stream; mod stream;
mod streams; mod streams;
pub use self::streams::{Streams, Stream}; pub use self::streams::{Streams, StreamRef};
use self::buffer::Buffer;
use self::flow_control::FlowControl; use self::flow_control::FlowControl;
use self::recv::Recv; use self::recv::Recv;
use self::send::Send; use self::send::Send;
use self::state::State; use self::state::State;
use self::store::{Store, Entry}; use self::store::{Store, Entry};
use self::stream::Stream;
use {frame, StreamId, ConnectionError}; use {frame, StreamId, ConnectionError};
use proto::*; use proto::*;

View File

@@ -8,7 +8,7 @@ use std::collections::VecDeque;
use std::marker::PhantomData; use std::marker::PhantomData;
#[derive(Debug)] #[derive(Debug)]
pub struct Recv<P> { pub(super) struct Recv<P, B> {
/// Maximum number of remote initiated streams /// Maximum number of remote initiated streams
max_streams: Option<usize>, max_streams: Option<usize>,
@@ -26,10 +26,13 @@ pub struct Recv<P> {
/// Refused StreamId, this represents a frame that must be sent out. /// Refused StreamId, this represents a frame that must be sent out.
refused: Option<StreamId>, refused: Option<StreamId>,
_p: PhantomData<P>, _p: PhantomData<(P, B)>,
} }
impl<P: Peer> Recv<P> { impl<P, B> Recv<P, B>
where P: Peer,
B: Buf,
{
pub fn new(config: &Config) -> Self { pub fn new(config: &Config) -> Self {
Recv { Recv {
max_streams: config.max_remote_initiated, max_streams: config.max_remote_initiated,
@@ -45,7 +48,7 @@ impl<P: Peer> Recv<P> {
/// Update state reflecting a new, remotely opened stream /// Update state reflecting a new, remotely opened stream
/// ///
/// Returns the stream state if successful. `None` if refused /// Returns the stream state if successful. `None` if refused
pub fn open(&mut self, id: StreamId) -> Result<Option<State>, ConnectionError> { pub fn open(&mut self, id: StreamId) -> Result<Option<Stream<B>>, ConnectionError> {
assert!(self.refused.is_none()); assert!(self.refused.is_none());
try!(self.ensure_can_open(id)); try!(self.ensure_can_open(id));
@@ -60,25 +63,25 @@ impl<P: Peer> Recv<P> {
// Increment the number of remote initiated streams // Increment the number of remote initiated streams
self.num_streams += 1; self.num_streams += 1;
Ok(Some(State::default())) Ok(Some(Stream::new()))
} }
/// Transition the stream state based on receiving headers /// Transition the stream state based on receiving headers
pub fn recv_headers(&mut self, state: &mut State, eos: bool) pub fn recv_headers(&mut self, stream: &mut Stream<B>, eos: bool)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
state.recv_open(self.init_window_sz, eos) stream.state.recv_open(self.init_window_sz, eos)
} }
pub fn recv_eos(&mut self, state: &mut State) pub fn recv_eos(&mut self, stream: &mut Stream<B>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
state.recv_close() stream.state.recv_close()
} }
pub fn recv_data(&mut self, pub fn recv_data(&mut self,
frame: &frame::Data, frame: &frame::Data,
state: &mut State) stream: &mut Stream<B>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
let sz = frame.payload().len(); let sz = frame.payload().len();
@@ -89,7 +92,7 @@ impl<P: Peer> Recv<P> {
let sz = sz as WindowSize; let sz = sz as WindowSize;
match state.recv_flow_control() { match stream.recv_flow_control() {
Some(flow) => { Some(flow) => {
// Ensure there's enough capacity on the connection before // Ensure there's enough capacity on the connection before
// acting on the stream. // acting on the stream.
@@ -106,7 +109,7 @@ impl<P: Peer> Recv<P> {
} }
if frame.is_end_stream() { if frame.is_end_stream() {
try!(state.recv_close()); try!(stream.state.recv_close());
} }
Ok(()) Ok(())
@@ -133,10 +136,9 @@ impl<P: Peer> Recv<P> {
} }
/// Send any pending refusals. /// Send any pending refusals.
pub fn send_pending_refusal<T, B>(&mut self, dst: &mut Codec<T, B>) pub fn send_pending_refusal<T>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError> -> Poll<(), ConnectionError>
where T: AsyncWrite, where T: AsyncWrite,
B: Buf,
{ {
if let Some(stream_id) = self.refused.take() { if let Some(stream_id) = self.refused.take() {
let frame = frame::Reset::new(stream_id, RefusedStream); let frame = frame::Reset::new(stream_id, RefusedStream);
@@ -168,11 +170,11 @@ impl<P: Peer> Recv<P> {
pub fn expand_stream_window(&mut self, pub fn expand_stream_window(&mut self,
id: StreamId, id: StreamId,
sz: WindowSize, sz: WindowSize,
state: &mut State) stream: &mut Stream<B>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
// TODO: handle overflow // TODO: handle overflow
if let Some(flow) = state.recv_flow_control() { if let Some(flow) = stream.recv_flow_control() {
flow.expand_window(sz); flow.expand_window(sz);
self.pending_window_updates.push_back(id); self.pending_window_updates.push_back(id);
} }
@@ -181,10 +183,9 @@ impl<P: Peer> Recv<P> {
} }
/// Send connection level window update /// Send connection level window update
pub fn send_connection_window_update<T, B>(&mut self, dst: &mut Codec<T, B>) pub fn send_connection_window_update<T>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError> -> Poll<(), ConnectionError>
where T: AsyncWrite, where T: AsyncWrite,
B: Buf,
{ {
if let Some(incr) = self.flow_control.peek_window_update() { if let Some(incr) = self.flow_control.peek_window_update() {
let frame = frame::WindowUpdate::new(StreamId::zero(), incr); let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
@@ -200,16 +201,15 @@ impl<P: Peer> Recv<P> {
} }
/// Send stream level window update /// Send stream level window update
pub fn send_stream_window_update<T, B>(&mut self, pub fn send_stream_window_update<T>(&mut self,
streams: &mut Store, streams: &mut Store<B>,
dst: &mut Codec<T, B>) dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError> -> Poll<(), ConnectionError>
where T: AsyncWrite, where T: AsyncWrite,
B: Buf,
{ {
while let Some(id) = self.pending_window_updates.pop_front() { while let Some(id) = self.pending_window_updates.pop_front() {
let flow = streams.get_mut(&id) let flow = streams.get_mut(&id)
.and_then(|state| state.recv_flow_control()); .and_then(|stream| stream.recv_flow_control());
if let Some(flow) = flow { if let Some(flow) = flow {

View File

@@ -10,7 +10,7 @@ use std::collections::VecDeque;
use std::marker::PhantomData; use std::marker::PhantomData;
#[derive(Debug)] #[derive(Debug)]
pub struct Send<P> { pub(super) struct Send<P, B> {
/// Maximum number of locally initiated streams /// Maximum number of locally initiated streams
max_streams: Option<usize>, max_streams: Option<usize>,
@@ -30,6 +30,9 @@ pub struct Send<P> {
// XXX It would be cool if this didn't exist. // XXX It would be cool if this didn't exist.
pending_window_updates: VecDeque<StreamId>, pending_window_updates: VecDeque<StreamId>,
/// Holds frames that are waiting to be written to the socket
buffer: Buffer<B>,
/// When `poll_window_update` is not ready, then the calling task is saved to /// When `poll_window_update` is not ready, then the calling task is saved to
/// be notified later. Access to poll_window_update must not be shared across tasks, /// be notified later. Access to poll_window_update must not be shared across tasks,
/// as we only track a single task (and *not* i.e. a task per stream id). /// as we only track a single task (and *not* i.e. a task per stream id).
@@ -38,7 +41,10 @@ pub struct Send<P> {
_p: PhantomData<P>, _p: PhantomData<P>,
} }
impl<P: Peer> Send<P> { impl<P, B> Send<P, B>
where P: Peer,
B: Buf,
{
pub fn new(config: &Config) -> Self { pub fn new(config: &Config) -> Self {
let next_stream_id = if P::is_server() { let next_stream_id = if P::is_server() {
2 2
@@ -53,6 +59,7 @@ impl<P: Peer> Send<P> {
init_window_sz: config.init_local_window_sz, init_window_sz: config.init_local_window_sz,
flow_control: FlowControl::new(config.init_local_window_sz), flow_control: FlowControl::new(config.init_local_window_sz),
pending_window_updates: VecDeque::new(), pending_window_updates: VecDeque::new(),
buffer: Buffer::new(),
blocked: None, blocked: None,
_p: PhantomData, _p: PhantomData,
} }
@@ -61,7 +68,7 @@ impl<P: Peer> Send<P> {
/// Update state reflecting a new, locally opened stream /// Update state reflecting a new, locally opened stream
/// ///
/// Returns the stream state if successful. `None` if refused /// Returns the stream state if successful. `None` if refused
pub fn open(&mut self) -> Result<(StreamId, State), ConnectionError> { pub fn open(&mut self) -> Result<(StreamId, Stream<B>), ConnectionError> {
try!(self.ensure_can_open()); try!(self.ensure_can_open());
if let Some(max) = self.max_streams { if let Some(max) = self.max_streams {
@@ -70,7 +77,7 @@ impl<P: Peer> Send<P> {
} }
} }
let ret = (self.next_stream_id, State::default()); let ret = (self.next_stream_id, Stream::new());
// Increment the number of locally initiated streams // Increment the number of locally initiated streams
self.num_streams += 1; self.num_streams += 1;
@@ -79,21 +86,24 @@ impl<P: Peer> Send<P> {
Ok(ret) Ok(ret)
} }
pub fn send_headers(&mut self, state: &mut State, eos: bool) pub fn send_headers(&mut self, stream: &mut Stream<B>, frame: frame::Headers)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
state.send_open(self.init_window_sz, eos) // Update the state
stream.state.send_open(self.init_window_sz, frame.is_end_stream())?;
// stream.send_buf.headers = Some(frame);
Ok(())
} }
pub fn send_eos(&mut self, state: &mut State) pub fn send_eos(&mut self, stream: &mut Stream<B>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
state.send_close() stream.state.send_close()
} }
pub fn send_data<B: Buf>(&mut self, pub fn send_data(&mut self,
frame: &frame::Data<B>, frame: &frame::Data<B>,
state: &mut State) stream: &mut Stream<B>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
let sz = frame.payload().remaining(); let sz = frame.payload().remaining();
@@ -107,7 +117,7 @@ impl<P: Peer> Send<P> {
// Make borrow checker happy // Make borrow checker happy
loop { loop {
match state.send_flow_control() { match stream.send_flow_control() {
Some(flow) => { Some(flow) => {
try!(self.flow_control.ensure_window(sz, FlowControlViolation)); try!(self.flow_control.ensure_window(sz, FlowControlViolation));
@@ -123,7 +133,7 @@ impl<P: Peer> Send<P> {
None => {} None => {}
} }
if state.is_closed() { if stream.state.is_closed() {
return Err(InactiveStreamId.into()) return Err(InactiveStreamId.into())
} else { } else {
return Err(UnexpectedFrameType.into()) return Err(UnexpectedFrameType.into())
@@ -131,14 +141,14 @@ impl<P: Peer> Send<P> {
} }
if frame.is_end_stream() { if frame.is_end_stream() {
try!(state.send_close()); try!(stream.state.send_close());
} }
Ok(()) Ok(())
} }
/// Get pending window updates /// Get pending window updates
pub fn poll_window_update(&mut self, streams: &mut Store) pub fn poll_window_update(&mut self, streams: &mut Store<B>)
-> Poll<WindowUpdate, ConnectionError> -> Poll<WindowUpdate, ConnectionError>
{ {
// This biases connection window updates, which probably makes sense. // This biases connection window updates, which probably makes sense.
@@ -152,7 +162,7 @@ impl<P: Peer> Send<P> {
let update = self.pending_window_updates.pop_front() let update = self.pending_window_updates.pop_front()
.and_then(|id| { .and_then(|id| {
streams.get_mut(&id) streams.get_mut(&id)
.and_then(|state| state.send_flow_control()) .and_then(|stream| stream.send_flow_control())
.and_then(|flow| flow.apply_window_update()) .and_then(|flow| flow.apply_window_update())
.map(|incr| WindowUpdate::new(id, incr)) .map(|incr| WindowUpdate::new(id, incr))
}); });
@@ -184,10 +194,10 @@ impl<P: Peer> Send<P> {
pub fn recv_stream_window_update(&mut self, pub fn recv_stream_window_update(&mut self,
frame: frame::WindowUpdate, frame: frame::WindowUpdate,
state: &mut State) stream: &mut Stream<B>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
if let Some(flow) = state.send_flow_control() { if let Some(flow) = stream.send_flow_control() {
// TODO: Handle invalid increment // TODO: Handle invalid increment
flow.expand_window(frame.size_increment()); flow.expand_window(frame.size_increment());
} }

View File

@@ -1,32 +1,32 @@
extern crate slab;
use super::*; use super::*;
use slab;
use std::collections::{HashMap, hash_map}; use std::collections::{HashMap, hash_map};
/// Storage for streams /// Storage for streams
#[derive(Debug)] #[derive(Debug)]
pub struct Store { pub(super) struct Store<B> {
slab: slab::Slab<State>, slab: slab::Slab<Stream<B>>,
ids: HashMap<StreamId, usize>, ids: HashMap<StreamId, usize>,
} }
pub enum Entry<'a> { pub(super) enum Entry<'a, B: 'a> {
Occupied(OccupiedEntry<'a>), Occupied(OccupiedEntry<'a, B>),
Vacant(VacantEntry<'a>), Vacant(VacantEntry<'a, B>),
} }
pub struct OccupiedEntry<'a> { pub(super) struct OccupiedEntry<'a, B: 'a> {
ids: hash_map::OccupiedEntry<'a, StreamId, usize>, ids: hash_map::OccupiedEntry<'a, StreamId, usize>,
slab: &'a mut slab::Slab<State>, slab: &'a mut slab::Slab<Stream<B>>,
} }
pub struct VacantEntry<'a> { pub(super) struct VacantEntry<'a, B: 'a> {
ids: hash_map::VacantEntry<'a, StreamId, usize>, ids: hash_map::VacantEntry<'a, StreamId, usize>,
slab: &'a mut slab::Slab<State>, slab: &'a mut slab::Slab<Stream<B>>,
} }
impl Store { impl<B> Store<B> {
pub fn new() -> Self { pub fn new() -> Self {
Store { Store {
slab: slab::Slab::new(), slab: slab::Slab::new(),
@@ -34,7 +34,7 @@ impl Store {
} }
} }
pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut State> { pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut Stream<B>> {
if let Some(handle) = self.ids.get(id) { if let Some(handle) = self.ids.get(id) {
Some(&mut self.slab[*handle]) Some(&mut self.slab[*handle])
} else { } else {
@@ -42,12 +42,12 @@ impl Store {
} }
} }
pub fn insert(&mut self, id: StreamId, val: State) { pub fn insert(&mut self, id: StreamId, val: Stream<B>) {
let handle = self.slab.insert(val); let handle = self.slab.insert(val);
assert!(self.ids.insert(id, handle).is_none()); assert!(self.ids.insert(id, handle).is_none());
} }
pub fn entry(&mut self, id: StreamId) -> Entry { pub fn entry(&mut self, id: StreamId) -> Entry<B> {
use self::hash_map::Entry::*; use self::hash_map::Entry::*;
match self.ids.entry(id) { match self.ids.entry(id) {
@@ -67,14 +67,14 @@ impl Store {
} }
} }
impl<'a> OccupiedEntry<'a> { impl<'a, B> OccupiedEntry<'a, B> {
pub fn into_mut(self) -> &'a mut State { pub fn into_mut(self) -> &'a mut Stream<B> {
&mut self.slab[*self.ids.get()] &mut self.slab[*self.ids.get()]
} }
} }
impl<'a> VacantEntry<'a> { impl<'a, B> VacantEntry<'a, B> {
pub fn insert(self, value: State) -> &'a mut State { pub fn insert(self, value: Stream<B>) -> &'a mut Stream<B> {
// Insert the value in the slab // Insert the value in the slab
let handle = self.slab.insert(value); let handle = self.slab.insert(value);

View File

@@ -0,0 +1,27 @@
use super::*;
#[derive(Debug)]
pub(super) struct Stream<B> {
/// Current state of the stream
pub state: State,
/// Frames pending for this stream being sent to the socket
pub pending_send: buffer::Deque<B>,
}
impl<B> Stream<B> {
pub fn new() -> Stream<B> {
Stream {
state: State::default(),
pending_send: buffer::Deque::new(),
}
}
pub fn send_flow_control(&mut self) -> Option<&mut FlowControl> {
self.state.send_flow_control()
}
pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> {
self.state.recv_flow_control()
}
}

View File

@@ -7,13 +7,14 @@ use std::sync::{Arc, Mutex};
// TODO: All the VecDeques should become linked lists using the State // TODO: All the VecDeques should become linked lists using the State
// values. // values.
#[derive(Debug)] #[derive(Debug)]
pub struct Streams<P> { pub struct Streams<P, B> {
inner: Arc<Mutex<Inner<P>>>, inner: Arc<Mutex<Inner<P, B>>>,
} }
/// Reference to the stream state
#[derive(Debug)] #[derive(Debug)]
pub struct Stream<P> { pub struct StreamRef<P, B> {
inner: Arc<Mutex<Inner<P>>>, inner: Arc<Mutex<Inner<P, B>>>,
id: StreamId, id: StreamId,
} }
@@ -22,21 +23,24 @@ pub struct Stream<P> {
/// ///
/// TODO: better name /// TODO: better name
#[derive(Debug)] #[derive(Debug)]
struct Inner<P> { struct Inner<P, B> {
actions: Actions<P>, actions: Actions<P, B>,
store: Store, store: Store<B>,
} }
#[derive(Debug)] #[derive(Debug)]
struct Actions<P> { struct Actions<P, B> {
/// Manages state transitions initiated by receiving frames /// Manages state transitions initiated by receiving frames
recv: Recv<P>, recv: Recv<P, B>,
/// Manages state transitions initiated by sending frames /// Manages state transitions initiated by sending frames
send: Send<P>, send: Send<P, B>,
} }
impl<P: Peer> Streams<P> { impl<P, B> Streams<P, B>
where P: Peer,
B: Buf,
{
pub fn new(config: Config) -> Self { pub fn new(config: Config) -> Self {
Streams { Streams {
inner: Arc::new(Mutex::new(Inner { inner: Arc::new(Mutex::new(Inner {
@@ -56,7 +60,7 @@ impl<P: Peer> Streams<P> {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let state = match me.store.entry(id) { let stream = match me.store.entry(id) {
Entry::Occupied(e) => e.into_mut(), Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => { Entry::Vacant(e) => {
// Trailers cannot open a stream. Trailers are header frames // Trailers cannot open a stream. Trailers are header frames
@@ -68,7 +72,7 @@ impl<P: Peer> Streams<P> {
} }
match try!(me.actions.recv.open(id)) { match try!(me.actions.recv.open(id)) {
Some(state) => e.insert(state), Some(stream) => e.insert(stream),
None => return Ok(None), None => return Ok(None),
} }
} }
@@ -80,12 +84,12 @@ impl<P: Peer> Streams<P> {
unimplemented!(); unimplemented!();
} }
try!(me.actions.recv.recv_eos(state)); try!(me.actions.recv.recv_eos(stream));
} else { } else {
try!(me.actions.recv.recv_headers(state, frame.is_end_stream())); try!(me.actions.recv.recv_headers(stream, frame.is_end_stream()));
} }
if state.is_closed() { if stream.state.is_closed() {
me.actions.dec_num_streams(id); me.actions.dec_num_streams(id);
} }
@@ -99,16 +103,16 @@ impl<P: Peer> Streams<P> {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let state = match me.store.get_mut(&id) { let stream = match me.store.get_mut(&id) {
Some(state) => state, Some(stream) => stream,
None => return Err(ProtocolError.into()), None => return Err(ProtocolError.into()),
}; };
// Ensure there's enough capacity on the connection before acting on the // Ensure there's enough capacity on the connection before acting on the
// stream. // stream.
try!(me.actions.recv.recv_data(frame, state)); try!(me.actions.recv.recv_data(frame, stream));
if state.is_closed() { if stream.state.is_closed() {
me.actions.dec_num_streams(id); me.actions.dec_num_streams(id);
} }
@@ -180,23 +184,23 @@ impl<P: Peer> Streams<P> {
*/ */
} }
pub fn send_data<B: Buf>(&mut self, frame: &frame::Data<B>) pub fn send_data(&mut self, frame: &frame::Data<B>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
let id = frame.stream_id(); let id = frame.stream_id();
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let state = match me.store.get_mut(&id) { let stream = match me.store.get_mut(&id) {
Some(state) => state, Some(stream) => stream,
None => return Err(UnexpectedFrameType.into()), None => return Err(UnexpectedFrameType.into()),
}; };
// Ensure there's enough capacity on the connection before acting on the // Ensure there's enough capacity on the connection before acting on the
// stream. // stream.
try!(me.actions.send.send_data(frame, state)); try!(me.actions.send.send_data(frame, stream));
if state.is_closed() { if stream.state.is_closed() {
me.actions.dec_num_streams(id); me.actions.dec_num_streams(id);
} }
@@ -228,20 +232,18 @@ impl<P: Peer> Streams<P> {
Ok(()) Ok(())
} }
pub fn send_pending_refusal<T, B>(&mut self, dst: &mut Codec<T, B>) pub fn send_pending_refusal<T>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError> -> Poll<(), ConnectionError>
where T: AsyncWrite, where T: AsyncWrite,
B: Buf,
{ {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
me.actions.recv.send_pending_refusal(dst) me.actions.recv.send_pending_refusal(dst)
} }
pub fn send_pending_window_updates<T, B>(&mut self, dst: &mut Codec<T, B>) pub fn send_pending_window_updates<T>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError> -> Poll<(), ConnectionError>
where T: AsyncWrite, where T: AsyncWrite,
B: Buf,
{ {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
@@ -252,41 +254,46 @@ impl<P: Peer> Streams<P> {
} }
} }
impl Streams<client::Peer> { impl<B> Streams<client::Peer, B>
where B: Buf,
{
pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool)
-> Result<Stream<client::Peer>, ConnectionError> -> Result<StreamRef<client::Peer, B>, ConnectionError>
{ {
let id = { let id = {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
// Initialize a new stream. This fails if the connection is at capacity. // Initialize a new stream. This fails if the connection is at capacity.
let (id, mut state) = me.actions.send.open()?; let (id, mut stream) = me.actions.send.open()?;
// Convert the message // Convert the message
let headers = client::Peer::convert_send_message( let headers = client::Peer::convert_send_message(
id, request, end_of_stream); id, request, end_of_stream);
me.actions.send.send_headers(&mut state, end_of_stream)?; me.actions.send.send_headers(&mut stream, headers)?;
// Given that the stream has been initialized, it should not be in the // Given that the stream has been initialized, it should not be in the
// closed state. // closed state.
debug_assert!(!state.is_closed()); debug_assert!(!stream.state.is_closed());
// Store the state // Store the state
me.store.insert(id, state); me.store.insert(id, stream);
id id
}; };
Ok(Stream { Ok(StreamRef {
inner: self.inner.clone(), inner: self.inner.clone(),
id: id, id: id,
}) })
} }
} }
impl<P: Peer> Actions<P> { impl<P, B> Actions<P, B>
where P: Peer,
B: Buf,
{
fn dec_num_streams(&mut self, id: StreamId) { fn dec_num_streams(&mut self, id: StreamId) {
if self.is_local_init(id) { if self.is_local_init(id) {
self.send.dec_num_streams(); self.send.dec_num_streams();