diff --git a/examples/server.rs b/examples/server.rs index cee18dc..5c4ea90 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -33,9 +33,7 @@ pub fn main() { let connection = Server::handshake(socket) - .then(|res| { - let conn = res.unwrap(); - + .and_then(|conn| { println!("H2 connection bound"); conn.for_each(|(request, mut stream)| { @@ -45,12 +43,14 @@ pub fn main() { .status(status::OK) .body(()).unwrap(); - if let Err(e) = stream.send_response(response, true) { + if let Err(e) = stream.send_response(response, false) { println!(" error responding; err={:?}", e); } println!(">>>> sending data"); - stream.send_data(Bytes::from_static(b"hello world"), true).unwrap(); + if let Err(e) = stream.send_data(Bytes::from_static(b"hello world"), true) { + println!(" -> err={:?}", e); + } Ok(()) }).and_then(|_| { @@ -59,7 +59,10 @@ pub fn main() { }) }) .then(|res| { - let _ = res.unwrap(); + if let Err(e) = res { + println!(" -> err={:?}", e); + } + Ok(()) }) ; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index cdaad1f..e06f492 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -7,7 +7,7 @@ use std::{fmt, cmp}; #[derive(Debug)] pub(super) struct Prioritize { /// Queue of streams waiting for socket capacity to send a frame - pending_send: store::Queue, + pending_send: store::Queue, /// Queue of streams waiting for window capacity to produce data. pending_capacity: store::Queue, @@ -125,8 +125,6 @@ impl Prioritize // don't notify the conneciton task. Once additional capacity // becomes available, the frame will be flushed. stream.pending_send.push_back(&mut self.buffer, frame.into()); - - debug_assert!(stream.is_pending_send_capacity); } Ok(()) @@ -213,8 +211,8 @@ impl Prioritize total_requested - stream.send_flow.available(), stream.send_flow.window_size()); - trace!("try_assign_capacity; requested={}; additional={}; conn={}", - total_requested, additional, self.flow.available()); + trace!("try_assign_capacity; requested={}; additional={}; window={}; conn={}", + total_requested, additional, stream.send_flow.window_size(), self.flow.available()); if additional == 0 { // Nothing more to do @@ -373,13 +371,25 @@ impl Prioritize } } + pub fn clear_queue(&mut self, stream: &mut store::Ptr) { + trace!("clear_queue; stream-id={:?}", stream.id); + + // TODO: make this more efficient? + while let Some(frame) = stream.pending_send.pop_front(&mut self.buffer) { + trace!("dropping; frame={:?}", frame); + } + } + fn pop_frame(&mut self, store: &mut Store, max_len: usize) -> Option>> { + trace!("pop_frame"); + loop { - trace!("pop frame"); match self.pending_send.pop(store) { Some(mut stream) => { + trace!("pop_frame; stream={:?}", stream.id); + let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() { Frame::Data(mut frame) => { // Get the amount of capacity remaining for stream's @@ -459,6 +469,8 @@ impl Prioritize frame => frame.map(|_| unreachable!()), }; + trace!("pop_frame; frame={:?}", frame); + if !stream.pending_send.is_empty() { // TODO: Only requeue the sender IF it is ready to send // the next frame. i.e. don't requeue it if the next diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index ad81030..a503ed5 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -31,7 +31,7 @@ pub(super) struct Recv { pending_window_updates: store::Queue, /// New streams to be accepted - pending_accept: store::Queue, + pending_accept: store::Queue, /// Holds frames that are waiting to be read buffer: Buffer, diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index f470505..c329770 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -115,12 +115,17 @@ impl Send where B: Buf { -> Result<(), ConnectionError> { if stream.state.is_closed() { + debug!("send_reset; invalid stream ID"); return Err(InactiveStreamId.into()) } stream.state.send_reset(reason)?; let frame = frame::Reset::new(stream.id, reason); + + self.prioritize.clear_queue(stream); + + trace!("send_reset -- queueing; frame={:?}", frame); self.prioritize.queue_frame(frame.into(), stream, task); Ok(()) @@ -189,17 +194,22 @@ impl Send where B: Buf { pub fn recv_stream_window_update(&mut self, sz: WindowSize, - stream: &mut store::Ptr) + stream: &mut store::Ptr, + task: &mut Option) + -> Result<(), ConnectionError> { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { - // TODO: Send reset - unimplemented!(); + debug!("recv_stream_window_update !!; err={:?}", e); + self.send_reset(FlowControlError.into(), stream, task)?; } + + Ok(()) } pub fn apply_remote_settings(&mut self, settings: &frame::Settings, - store: &mut Store) + store: &mut Store, + task: &mut Option) { if let Some(val) = settings.max_concurrent_streams() { self.max_streams = Some(val as usize); @@ -245,7 +255,7 @@ impl Send where B: Buf { let inc = val - old_val; store.for_each(|mut stream| { - self.recv_stream_window_update(inc, &mut stream); + self.recv_stream_window_update(inc, &mut stream, task); }); } } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 7c04065..6388f77 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -173,7 +173,10 @@ impl Queue /// /// If the stream is already contained by the list, return `false`. pub fn push(&mut self, stream: &mut store::Ptr) -> bool { + trace!("Queue::push"); + if N::is_queued(stream) { + trace!(" -> already queued"); return false; } @@ -185,6 +188,8 @@ impl Queue // Queue the stream match self.indices { Some(ref mut idxs) => { + trace!(" -> existing entries"); + // Update the current tail node to point to `stream` let key = stream.key(); N::set_next(&mut stream.resolve(idxs.tail), Some(key)); @@ -193,6 +198,7 @@ impl Queue idxs.tail = stream.key(); } None => { + trace!(" -> first entry"); self.indices = Some(store::Indices { head: stream.key(), tail: stream.key(), diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 5bf2abd..4a10acc 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -8,20 +8,14 @@ pub(super) struct Stream { /// Current state of the stream pub state: State, - /// Next node in the `Stream` linked list. - /// - /// This field is used in different linked lists depending on the stream - /// state. First, it is used as part of the linked list of streams waiting - /// to be accepted (either by a server or by a client as a push promise). - /// Once the stream is accepted, this is used for the linked list of streams - /// waiting to flush prioritized frames to the socket. - pub next: Option, - - /// Set to true when the stream is queued - pub is_queued: bool, - // ===== Fields related to sending ===== + /// Next node in the accept linked list + pub next_pending_send: Option, + + /// Set to true when the stream is pending accept + pub is_pending_send: bool, + /// Send data flow control pub send_flow: FlowControl, @@ -49,6 +43,12 @@ pub(super) struct Stream { // ===== Fields related to receiving ===== + /// Next node in the accept linked list + pub next_pending_accept: Option, + + /// Set to true when the stream is pending accept + pub is_pending_accept: bool, + /// Receive data flow control pub recv_flow: FlowControl, @@ -67,11 +67,14 @@ pub(super) struct Stream { pub recv_task: Option, /// The stream's pending push promises - pub pending_push_promises: store::Queue, + pub pending_push_promises: store::Queue, } #[derive(Debug)] -pub(super) struct Next; +pub(super) struct NextAccept; + +#[derive(Debug)] +pub(super) struct NextSend; #[derive(Debug)] pub(super) struct NextSendCapacity; @@ -85,11 +88,11 @@ impl Stream { Stream { id, state: State::default(), - next: None, - is_queued: false, // ===== Fields related to sending ===== + next_pending_send: None, + is_pending_send: false, send_flow: FlowControl::new(), requested_send_capacity: 0, buffered_send_data: 0, @@ -101,6 +104,8 @@ impl Stream { // ===== Fields related to receiving ===== + next_pending_accept: None, + is_pending_accept: false, recv_flow: FlowControl::new(), in_flight_recv_data: 0, next_window_update: None, @@ -135,25 +140,47 @@ impl Stream { } } -impl store::Next for Next { +impl store::Next for NextAccept { fn next(stream: &Stream) -> Option { - stream.next + stream.next_pending_accept } fn set_next(stream: &mut Stream, key: Option) { - stream.next = key; + stream.next_pending_accept = key; } fn take_next(stream: &mut Stream) -> Option { - stream.next.take() + stream.next_pending_accept.take() } fn is_queued(stream: &Stream) -> bool { - stream.is_queued + stream.is_pending_accept } fn set_queued(stream: &mut Stream, val: bool) { - stream.is_queued = val; + stream.is_pending_accept = val; + } +} + +impl store::Next for NextSend { + fn next(stream: &Stream) -> Option { + stream.next_pending_send + } + + fn set_next(stream: &mut Stream, key: Option) { + stream.next_pending_send = key; + } + + fn take_next(stream: &mut Stream) -> Option { + stream.next_pending_send.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_send + } + + fn set_queued(stream: &mut Stream, val: bool) { + stream.is_pending_send = val; } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 198df8d..0011f14 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -173,7 +173,7 @@ impl Streams // considers closed. It's ok... if let Some(mut stream) = me.store.find_mut(&id) { me.actions.send.recv_stream_window_update( - frame.size_increment(), &mut stream); + frame.size_increment(), &mut stream, &mut me.actions.task); } else { me.actions.recv.ensure_not_idle(id)?; } @@ -249,7 +249,8 @@ impl Streams let mut me = self.inner.lock().unwrap(); let me = &mut *me; - me.actions.send.apply_remote_settings(frame, &mut me.store); + me.actions.send.apply_remote_settings( + frame, &mut me.store, &mut me.actions.task); } pub fn poll_send_request_ready(&mut self) -> Poll<(), ConnectionError> {