diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 1fb11ac..1d119fe 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -103,6 +103,8 @@ impl Connection } }; + debug!("recv; frame={:?}", frame); + match frame { Some(Headers(frame)) => { trace!("recv HEADERS; frame={:?}", frame); diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 4151066..4c31d0e 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -49,7 +49,6 @@ impl FramedRead { } let kind = head.kind(); - debug!("decoded; kind={:?}", kind); let frame = match kind { Kind::Settings => { @@ -106,7 +105,6 @@ impl FramedRead { unimplemented!() } }; - debug!("decoded; frame={:?}", frame); Ok(Some(frame)) } @@ -128,7 +126,6 @@ impl futures::Stream for FramedRead trace!("poll; bytes={}B", bytes.len()); if let Some(frame) = try!(self.decode_frame(bytes)) { - debug!("poll; frame={:?}", frame); return Ok(Async::Ready(Some(frame))); } } diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index 1ce765c..4dbedbe 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -104,7 +104,7 @@ impl Sink for FramedWrite return Ok(AsyncSink::NotReady(item)); } - trace!("send; frame={:?}", item); + debug!("send; frame={:?}", item); match item { Frame::Data(mut v) => { diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index cf836b3..6777eff 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -16,6 +16,10 @@ pub(super) struct Prioritize { /// Holds frames that are waiting to be written to the socket buffer: Buffer, + + /// Holds the connection task. This signals the connection that there is + /// data to flush. + conn_task: Option, } impl Prioritize @@ -28,6 +32,7 @@ impl Prioritize flow_control: FlowControl::new(config.init_local_window_sz), buffered_data: 0, buffer: Buffer::new(), + conn_task: None, } } @@ -71,6 +76,10 @@ impl Prioritize // Queue the stream push_sender(&mut self.pending_send, stream); + + if let Some(ref task) = self.conn_task { + task.notify(); + } } pub fn poll_complete(&mut self, @@ -79,12 +88,16 @@ impl Prioritize -> Poll<(), ConnectionError> where T: AsyncWrite, { + self.conn_task = Some(task::current()); + + trace!("poll_complete"); loop { // Ensure codec is ready try_ready!(dst.poll_ready()); match self.pop_frame(store) { Some(frame) => { + trace!("writing frame={:?}", frame); // Subtract the data size self.buffered_data -= frame.flow_len(); diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 32d6fd9..0b2edd0 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -79,6 +79,7 @@ impl Send where B: Buf { stream: &mut store::Ptr) -> Result<(), ConnectionError> { + trace!("send_headers; frame={:?}", frame); // Update the state stream.state.send_open(self.init_window_sz, frame.is_end_stream())?; @@ -252,6 +253,26 @@ impl Send where B: Buf { Ok(()) } + pub fn window_size(&mut self, stream: &mut Stream) -> usize { + if let Some(flow) = stream.state.send_flow_control() { + // Track the current task + stream.send_task = Some(task::current()); + + // We are observing the window, so apply the pending updates + flow.apply_window_update(); + + let mut window = flow.effective_window_size(); + + if stream.unadvertised_send_window > window { + return 0; + } + + return (window - stream.unadvertised_send_window) as usize; + } + + 0 + } + pub fn dec_num_streams(&mut self) { self.num_streams -= 1; } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index e203279..bfa6fa0 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -190,40 +190,6 @@ impl Streams me.actions.recv.recv_push_promise::

(frame, &mut stream) } - pub fn send_headers(&mut self, headers: frame::Headers) - -> Result<(), ConnectionError> - { - unimplemented!(); - /* - let id = frame.stream_id(); - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - // let (id, state) = me.actions.send.open()); - - - let state = match me.store.entry(id) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => { - let (id, state) = try!(me.actions.send.open()); - e.insert(state) - } - }; - - if frame.is_trailers() { - try!(me.actions.send.send_eos(state)); - } else { - try!(me.actions.send.send_headers(state, frame.is_end_stream())); - } - - if state.is_closed() { - me.actions.dec_num_streams(id); - } - - Ok(()) - */ - } - pub fn next_incoming(&mut self) -> Option> { let key = { let mut me = self.inner.lock().unwrap(); @@ -399,6 +365,15 @@ impl StreamRef Ok(chunk.into()) } + + /// Returns the current window size + pub fn window_size(&mut self) -> usize { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + me.actions.send.window_size(&mut stream) + } } impl Clone for StreamRef { diff --git a/src/server.rs b/src/server.rs index f249c96..1fee874 100644 --- a/src/server.rs +++ b/src/server.rs @@ -26,6 +26,16 @@ pub struct Stream { inner: proto::StreamRef, } +#[derive(Debug)] +pub struct Send { + src: T, + dst: Option>, + // Pending data + buf: Option, + // True when this is the end of the stream + eos: bool, +} + /// Flush a Sink struct Flush { inner: Option, @@ -87,6 +97,7 @@ impl Server Handshake { inner: Box::new(handshake) } } + /// Returns `Ready` when the underlying connection has closed. pub fn poll_close(&mut self) -> Poll<(), ConnectionError> { self.connection.poll() } @@ -141,13 +152,21 @@ impl fmt::Debug for Server // ===== impl Stream ===== impl Stream { + /// Returns the current window size. + /// + /// This function also registers interest changes. The current task will be + /// notified when the window size is *increased*. + pub fn window_size(&mut self) -> usize { + self.inner.window_size() + } + pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool) -> Result<(), ConnectionError> { self.inner.send_response(response, end_of_stream) } - /// Send data + /// Send a single data frame pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ConnectionError> { @@ -162,6 +181,67 @@ impl Stream { } } +impl Stream { + /// Send the body + pub fn send(self, src: T, end_of_stream: bool,) -> Send + where T: futures::Stream, + { + Send { + src: src, + dst: Some(self), + buf: None, + eos: end_of_stream, + } + } +} + +// ===== impl Send ===== + +impl Future for Send + where T: futures::Stream, +{ + type Item = Stream; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll { + use futures::Stream; + + loop { + if self.buf.is_none() { + self.buf = try_ready!(self.src.poll()); + } + + match self.buf.take() { + Some(mut buf) => { + let cap = self.dst.as_mut().unwrap().window_size(); + + if cap == 0 { + self.buf = Some(buf); + return Ok(Async::NotReady); + } if cap >= buf.len() { + self.dst.as_mut().unwrap().send_data(buf, false)?; + } else { + let chunk = buf.split_to(cap); + self.buf = Some(buf); + self.dst.as_mut().unwrap().send_data(chunk, false)?; + + return Ok(Async::NotReady); + } + } + None => { + // TODO: It would be nice to not have to send an extra + // frame... + if self.eos { + self.dst.as_mut().unwrap().send_data(Bytes::new(), true)?; + } + + return Ok(Async::Ready(self.dst.take().unwrap())); + } + } + } + } +} + // ===== impl Flush ===== impl Flush {