Trying to get something working
This commit is contained in:
		| @@ -103,6 +103,8 @@ impl<T, P, B> Connection<T, P, B> | |||||||
|                 } |                 } | ||||||
|             }; |             }; | ||||||
|  |  | ||||||
|  |             debug!("recv; frame={:?}", frame); | ||||||
|  |  | ||||||
|             match frame { |             match frame { | ||||||
|                 Some(Headers(frame)) => { |                 Some(Headers(frame)) => { | ||||||
|                     trace!("recv HEADERS; frame={:?}", frame); |                     trace!("recv HEADERS; frame={:?}", frame); | ||||||
|   | |||||||
| @@ -49,7 +49,6 @@ impl<T> FramedRead<T> { | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         let kind = head.kind(); |         let kind = head.kind(); | ||||||
|         debug!("decoded; kind={:?}", kind); |  | ||||||
|  |  | ||||||
|         let frame = match kind { |         let frame = match kind { | ||||||
|             Kind::Settings => { |             Kind::Settings => { | ||||||
| @@ -106,7 +105,6 @@ impl<T> FramedRead<T> { | |||||||
|                 unimplemented!() |                 unimplemented!() | ||||||
|             } |             } | ||||||
|         }; |         }; | ||||||
|         debug!("decoded; frame={:?}", frame); |  | ||||||
|  |  | ||||||
|         Ok(Some(frame)) |         Ok(Some(frame)) | ||||||
|     } |     } | ||||||
| @@ -128,7 +126,6 @@ impl<T> futures::Stream for FramedRead<T> | |||||||
|  |  | ||||||
|             trace!("poll; bytes={}B", bytes.len()); |             trace!("poll; bytes={}B", bytes.len()); | ||||||
|             if let Some(frame) = try!(self.decode_frame(bytes)) { |             if let Some(frame) = try!(self.decode_frame(bytes)) { | ||||||
|                 debug!("poll; frame={:?}", frame); |  | ||||||
|                 return Ok(Async::Ready(Some(frame))); |                 return Ok(Async::Ready(Some(frame))); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|   | |||||||
| @@ -104,7 +104,7 @@ impl<T, B> Sink for FramedWrite<T, B> | |||||||
|             return Ok(AsyncSink::NotReady(item)); |             return Ok(AsyncSink::NotReady(item)); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         trace!("send; frame={:?}", item); |         debug!("send; frame={:?}", item); | ||||||
|  |  | ||||||
|         match item { |         match item { | ||||||
|             Frame::Data(mut v) => { |             Frame::Data(mut v) => { | ||||||
|   | |||||||
| @@ -16,6 +16,10 @@ pub(super) struct Prioritize<B> { | |||||||
|  |  | ||||||
|     /// Holds frames that are waiting to be written to the socket |     /// Holds frames that are waiting to be written to the socket | ||||||
|     buffer: Buffer<B>, |     buffer: Buffer<B>, | ||||||
|  |  | ||||||
|  |     /// Holds the connection task. This signals the connection that there is | ||||||
|  |     /// data to flush. | ||||||
|  |     conn_task: Option<task::Task>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<B> Prioritize<B> | impl<B> Prioritize<B> | ||||||
| @@ -28,6 +32,7 @@ impl<B> Prioritize<B> | |||||||
|             flow_control: FlowControl::new(config.init_local_window_sz), |             flow_control: FlowControl::new(config.init_local_window_sz), | ||||||
|             buffered_data: 0, |             buffered_data: 0, | ||||||
|             buffer: Buffer::new(), |             buffer: Buffer::new(), | ||||||
|  |             conn_task: None, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -71,6 +76,10 @@ impl<B> Prioritize<B> | |||||||
|  |  | ||||||
|         // Queue the stream |         // Queue the stream | ||||||
|         push_sender(&mut self.pending_send, stream); |         push_sender(&mut self.pending_send, stream); | ||||||
|  |  | ||||||
|  |         if let Some(ref task) = self.conn_task { | ||||||
|  |             task.notify(); | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn poll_complete<T>(&mut self, |     pub fn poll_complete<T>(&mut self, | ||||||
| @@ -79,12 +88,16 @@ impl<B> Prioritize<B> | |||||||
|         -> Poll<(), ConnectionError> |         -> Poll<(), ConnectionError> | ||||||
|         where T: AsyncWrite, |         where T: AsyncWrite, | ||||||
|     { |     { | ||||||
|  |         self.conn_task = Some(task::current()); | ||||||
|  |  | ||||||
|  |         trace!("poll_complete"); | ||||||
|         loop { |         loop { | ||||||
|             // Ensure codec is ready |             // Ensure codec is ready | ||||||
|             try_ready!(dst.poll_ready()); |             try_ready!(dst.poll_ready()); | ||||||
|  |  | ||||||
|             match self.pop_frame(store) { |             match self.pop_frame(store) { | ||||||
|                 Some(frame) => { |                 Some(frame) => { | ||||||
|  |                     trace!("writing frame={:?}", frame); | ||||||
|                     // Subtract the data size |                     // Subtract the data size | ||||||
|                     self.buffered_data -= frame.flow_len(); |                     self.buffered_data -= frame.flow_len(); | ||||||
|  |  | ||||||
|   | |||||||
| @@ -79,6 +79,7 @@ impl<B> Send<B> where B: Buf { | |||||||
|                         stream: &mut store::Ptr<B>) |                         stream: &mut store::Ptr<B>) | ||||||
|         -> Result<(), ConnectionError> |         -> Result<(), ConnectionError> | ||||||
|     { |     { | ||||||
|  |         trace!("send_headers; frame={:?}", frame); | ||||||
|         // Update the state |         // Update the state | ||||||
|         stream.state.send_open(self.init_window_sz, frame.is_end_stream())?; |         stream.state.send_open(self.init_window_sz, frame.is_end_stream())?; | ||||||
|  |  | ||||||
| @@ -252,6 +253,26 @@ impl<B> Send<B> where B: Buf { | |||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn window_size(&mut self, stream: &mut Stream<B>) -> usize { | ||||||
|  |         if let Some(flow) = stream.state.send_flow_control() { | ||||||
|  |             // Track the current task | ||||||
|  |             stream.send_task = Some(task::current()); | ||||||
|  |  | ||||||
|  |             // We are observing the window, so apply the pending updates | ||||||
|  |             flow.apply_window_update(); | ||||||
|  |  | ||||||
|  |             let mut window = flow.effective_window_size(); | ||||||
|  |  | ||||||
|  |             if stream.unadvertised_send_window > window { | ||||||
|  |                 return 0; | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             return (window - stream.unadvertised_send_window) as usize; | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         0 | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn dec_num_streams(&mut self) { |     pub fn dec_num_streams(&mut self) { | ||||||
|         self.num_streams -= 1; |         self.num_streams -= 1; | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -190,40 +190,6 @@ impl<B> Streams<B> | |||||||
|         me.actions.recv.recv_push_promise::<P>(frame, &mut stream) |         me.actions.recv.recv_push_promise::<P>(frame, &mut stream) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn send_headers(&mut self, headers: frame::Headers) |  | ||||||
|         -> Result<(), ConnectionError> |  | ||||||
|     { |  | ||||||
|         unimplemented!(); |  | ||||||
|         /* |  | ||||||
|         let id = frame.stream_id(); |  | ||||||
|         let mut me = self.inner.lock().unwrap(); |  | ||||||
|         let me = &mut *me; |  | ||||||
|  |  | ||||||
|         // let (id, state) = me.actions.send.open()); |  | ||||||
|  |  | ||||||
|  |  | ||||||
|         let state = match me.store.entry(id) { |  | ||||||
|             Entry::Occupied(e) => e.into_mut(), |  | ||||||
|             Entry::Vacant(e) => { |  | ||||||
|                 let (id, state) = try!(me.actions.send.open()); |  | ||||||
|                 e.insert(state) |  | ||||||
|             } |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         if frame.is_trailers() { |  | ||||||
|             try!(me.actions.send.send_eos(state)); |  | ||||||
|         } else { |  | ||||||
|             try!(me.actions.send.send_headers(state, frame.is_end_stream())); |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         if state.is_closed() { |  | ||||||
|             me.actions.dec_num_streams(id); |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         Ok(()) |  | ||||||
|         */ |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn next_incoming(&mut self) -> Option<StreamRef<B>> { |     pub fn next_incoming(&mut self) -> Option<StreamRef<B>> { | ||||||
|         let key = { |         let key = { | ||||||
|             let mut me = self.inner.lock().unwrap(); |             let mut me = self.inner.lock().unwrap(); | ||||||
| @@ -399,6 +365,15 @@ impl<B> StreamRef<B> | |||||||
|  |  | ||||||
|         Ok(chunk.into()) |         Ok(chunk.into()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// Returns the current window size | ||||||
|  |     pub fn window_size(&mut self) -> usize { | ||||||
|  |         let mut me = self.inner.lock().unwrap(); | ||||||
|  |         let me = &mut *me; | ||||||
|  |  | ||||||
|  |         let mut stream = me.store.resolve(self.key); | ||||||
|  |         me.actions.send.window_size(&mut stream) | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<B> Clone for StreamRef<B> { | impl<B> Clone for StreamRef<B> { | ||||||
|   | |||||||
| @@ -26,6 +26,16 @@ pub struct Stream<B: IntoBuf> { | |||||||
|     inner: proto::StreamRef<B::Buf>, |     inner: proto::StreamRef<B::Buf>, | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[derive(Debug)] | ||||||
|  | pub struct Send<T> { | ||||||
|  |     src: T, | ||||||
|  |     dst: Option<Stream<Bytes>>, | ||||||
|  |     // Pending data | ||||||
|  |     buf: Option<Bytes>, | ||||||
|  |     // True when this is the end of the stream | ||||||
|  |     eos: bool, | ||||||
|  | } | ||||||
|  |  | ||||||
| /// Flush a Sink | /// Flush a Sink | ||||||
| struct Flush<T> { | struct Flush<T> { | ||||||
|     inner: Option<T>, |     inner: Option<T>, | ||||||
| @@ -87,6 +97,7 @@ impl<T, B> Server<T, B> | |||||||
|         Handshake { inner: Box::new(handshake) } |         Handshake { inner: Box::new(handshake) } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// Returns `Ready` when the underlying connection has closed. | ||||||
|     pub fn poll_close(&mut self) -> Poll<(), ConnectionError> { |     pub fn poll_close(&mut self) -> Poll<(), ConnectionError> { | ||||||
|         self.connection.poll() |         self.connection.poll() | ||||||
|     } |     } | ||||||
| @@ -141,13 +152,21 @@ impl<T, B> fmt::Debug for Server<T, B> | |||||||
| // ===== impl Stream ===== | // ===== impl Stream ===== | ||||||
|  |  | ||||||
| impl<B: IntoBuf> Stream<B> { | impl<B: IntoBuf> Stream<B> { | ||||||
|  |     /// Returns the current window size. | ||||||
|  |     /// | ||||||
|  |     /// This function also registers interest changes. The current task will be | ||||||
|  |     /// notified when the window size is *increased*. | ||||||
|  |     pub fn window_size(&mut self) -> usize { | ||||||
|  |         self.inner.window_size() | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool) |     pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool) | ||||||
|         -> Result<(), ConnectionError> |         -> Result<(), ConnectionError> | ||||||
|     { |     { | ||||||
|         self.inner.send_response(response, end_of_stream) |         self.inner.send_response(response, end_of_stream) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Send data |     /// Send a single data frame | ||||||
|     pub fn send_data(&mut self, data: B, end_of_stream: bool) |     pub fn send_data(&mut self, data: B, end_of_stream: bool) | ||||||
|         -> Result<(), ConnectionError> |         -> Result<(), ConnectionError> | ||||||
|     { |     { | ||||||
| @@ -162,6 +181,67 @@ impl<B: IntoBuf> Stream<B> { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | impl Stream<Bytes> { | ||||||
|  |     /// Send the body | ||||||
|  |     pub fn send<T>(self, src: T, end_of_stream: bool,) -> Send<T> | ||||||
|  |         where T: futures::Stream<Item = Bytes, Error = ConnectionError>, | ||||||
|  |     { | ||||||
|  |         Send { | ||||||
|  |             src: src, | ||||||
|  |             dst: Some(self), | ||||||
|  |             buf: None, | ||||||
|  |             eos: end_of_stream, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // ===== impl Send ===== | ||||||
|  |  | ||||||
|  | impl<T> Future for Send<T> | ||||||
|  |     where T: futures::Stream<Item = Bytes, Error = ConnectionError>, | ||||||
|  | { | ||||||
|  |     type Item = Stream<Bytes>; | ||||||
|  |     type Error = ConnectionError; | ||||||
|  |  | ||||||
|  |     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||||
|  |         use futures::Stream; | ||||||
|  |  | ||||||
|  |         loop { | ||||||
|  |             if self.buf.is_none() { | ||||||
|  |                 self.buf = try_ready!(self.src.poll()); | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             match self.buf.take() { | ||||||
|  |                 Some(mut buf) => { | ||||||
|  |                     let cap = self.dst.as_mut().unwrap().window_size(); | ||||||
|  |  | ||||||
|  |                     if cap == 0 { | ||||||
|  |                         self.buf = Some(buf); | ||||||
|  |                         return Ok(Async::NotReady); | ||||||
|  |                     } if cap >= buf.len() { | ||||||
|  |                         self.dst.as_mut().unwrap().send_data(buf, false)?; | ||||||
|  |                     } else { | ||||||
|  |                         let chunk = buf.split_to(cap); | ||||||
|  |                         self.buf = Some(buf); | ||||||
|  |                         self.dst.as_mut().unwrap().send_data(chunk, false)?; | ||||||
|  |  | ||||||
|  |                         return Ok(Async::NotReady); | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |                 None => { | ||||||
|  |                     // TODO: It would be nice to not have to send an extra | ||||||
|  |                     // frame... | ||||||
|  |                     if self.eos { | ||||||
|  |                         self.dst.as_mut().unwrap().send_data(Bytes::new(), true)?; | ||||||
|  |                     } | ||||||
|  |  | ||||||
|  |                     return Ok(Async::Ready(self.dst.take().unwrap())); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| // ===== impl Flush ===== | // ===== impl Flush ===== | ||||||
|  |  | ||||||
| impl<T> Flush<T> { | impl<T> Flush<T> { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user