wip: problems with Frame::len()
This commit is contained in:
		| @@ -1,23 +1,23 @@ | ||||
| use {ConnectionError, Frame, FrameSize}; | ||||
| use client::Client; | ||||
| use error; | ||||
| use frame::{self, SettingSet, StreamId}; | ||||
| use proto::{self, ControlFlow, ControlPing, ControlSettings, Peer, PingPayload, ReadySink, WindowSize}; | ||||
| use server::Server; | ||||
|  | ||||
| use tokio_io::{AsyncRead, AsyncWrite}; | ||||
|  | ||||
| use http::{request, response}; | ||||
| use bytes::{Bytes, IntoBuf}; | ||||
|  | ||||
| use http::{request, response}; | ||||
| use futures::*; | ||||
|  | ||||
| use tokio_io::{AsyncRead, AsyncWrite}; | ||||
| use std::marker::PhantomData; | ||||
|  | ||||
| /// An H2 connection | ||||
| #[derive(Debug)] | ||||
| pub struct Connection<T, P, B: IntoBuf = Bytes> { | ||||
|     inner: proto::Transport<T, P, B::Buf>, | ||||
|     peer: PhantomData<P>, | ||||
|     // Set to `true` as long as the connection is in a valid state. | ||||
|     active: bool, | ||||
|     _phantom: PhantomData<(P, B)>, | ||||
| } | ||||
|  | ||||
| pub fn new<T, P, B>(transport: proto::Transport<T, P, B::Buf>) | ||||
| @@ -28,13 +28,14 @@ pub fn new<T, P, B>(transport: proto::Transport<T, P, B::Buf>) | ||||
| { | ||||
|     Connection { | ||||
|         inner: transport, | ||||
|         peer: PhantomData, | ||||
|         active: true, | ||||
|         _phantom: PhantomData, | ||||
|     } | ||||
| } | ||||
|  | ||||
|  | ||||
| impl<T, P, B> ControlSettings for Connection<T, P, B> | ||||
|     where T: ControlSettings, | ||||
|     where T: AsyncRead + AsyncWrite, | ||||
|           B: IntoBuf, | ||||
| { | ||||
|     fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> { | ||||
| @@ -51,7 +52,7 @@ impl<T, P, B> ControlSettings for Connection<T, P, B> | ||||
| } | ||||
|  | ||||
| impl<T, P, B> ControlFlow for Connection<T, P, B> | ||||
|     where T: ControlFlow, | ||||
|     where T: AsyncRead + AsyncWrite, | ||||
|           B: IntoBuf, | ||||
| { | ||||
|     fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> { | ||||
| @@ -65,7 +66,6 @@ impl<T, P, B> ControlFlow for Connection<T, P, B> | ||||
|  | ||||
| impl<T, P, B> ControlPing for Connection<T, P, B> | ||||
|     where T: AsyncRead + AsyncWrite, | ||||
|           T: ControlPing, | ||||
|           P: Peer, | ||||
|           B: IntoBuf, | ||||
| { | ||||
| @@ -146,6 +146,10 @@ impl<T, P, B> Stream for Connection<T, P, B> | ||||
|         use frame::Frame::*; | ||||
|         trace!("poll"); | ||||
|  | ||||
|         if !self.active { | ||||
|             return Err(error::User::Corrupt.into()); | ||||
|         } | ||||
|  | ||||
|         loop { | ||||
|             let frame = match try!(self.inner.poll()) { | ||||
|                 Async::Ready(f) => f, | ||||
| @@ -153,7 +157,7 @@ impl<T, P, B> Stream for Connection<T, P, B> | ||||
|                     // Receiving new frames may depend on ensuring that the write buffer | ||||
|                     // is clear (e.g. if window updates need to be sent), so `poll_complete` | ||||
|                     // is called here.  | ||||
|                     try_ready!(self.inner.poll_complete()); | ||||
|                     try_ready!(self.poll_complete()); | ||||
|  | ||||
|                     // If the write buffer is cleared, attempt to poll the underlying | ||||
|                     // stream once more because it, may have been made ready. | ||||
| @@ -172,7 +176,7 @@ impl<T, P, B> Stream for Connection<T, P, B> | ||||
|                 Some(Data(v)) => Frame::Data { | ||||
|                     id: v.stream_id(), | ||||
|                     end_of_stream: v.is_end_stream(), | ||||
|                     data_len: v.len(), | ||||
|                     //data_len: v.len(), | ||||
|                     data: v.into_payload(), | ||||
|                 }, | ||||
|  | ||||
| @@ -199,9 +203,13 @@ impl<T, P, B> Sink for Connection<T, P, B> | ||||
|     { | ||||
|         trace!("start_send"); | ||||
|  | ||||
|         if !self.active { | ||||
|             return Err(error::User::Corrupt.into()); | ||||
|         } | ||||
|  | ||||
|         // Ensure the transport is ready to send a frame before we transform the external | ||||
|         // `Frame` into an internal `frame::Framme`. | ||||
|         if self.inner.poll_ready()? == Async::NotReady { | ||||
|         // `Frame` into an internal `frame::Frame`. | ||||
|         if !try!(self.poll_ready()).is_ready() { | ||||
|             return Ok(AsyncSink::NotReady(item)); | ||||
|         } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user