add recv_frame and send_frame extensions for Handle Futures
This commit is contained in:
		| @@ -3,13 +3,13 @@ use {FutureExt, SendFrame}; | ||||
| use h2::{self, SendError, RecvError}; | ||||
| use h2::frame::{self, Frame}; | ||||
|  | ||||
| use futures::{Future, Stream, Poll}; | ||||
| use futures::{Async, Future, Stream, Poll}; | ||||
| use futures::task::{self, Task}; | ||||
|  | ||||
| use tokio_io::{AsyncRead, AsyncWrite}; | ||||
| use tokio_io::io::read_exact; | ||||
|  | ||||
| use std::{cmp, io}; | ||||
| use std::{cmp, fmt, io}; | ||||
| use std::io::ErrorKind::WouldBlock; | ||||
| use std::sync::{Arc, Mutex}; | ||||
|  | ||||
| @@ -303,3 +303,118 @@ impl AsyncWrite for Pipe { | ||||
|         Ok(().into()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub trait HandleFutureExt { | ||||
|     fn recv_settings(self) -> RecvFrame<Box<Future<Item=(Option<Frame>, Handle), Error=()>>> | ||||
|         where Self: Sized + 'static, | ||||
|               Self: Future<Item=(frame::Settings, Handle)>, | ||||
|               Self::Error: fmt::Debug, | ||||
|     { | ||||
|         let map = self.map(|(settings, handle)| (Some(settings.into()), handle)).unwrap(); | ||||
|  | ||||
|         let boxed: Box<Future<Item=(Option<Frame>, Handle), Error=()>> = Box::new(map); | ||||
|         RecvFrame { | ||||
|             inner: boxed, | ||||
|             frame: frame::Settings::default().into(), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn recv_frame<T>(self, frame: T) -> RecvFrame<<Self as IntoRecvFrame>::Future> | ||||
|         where Self: IntoRecvFrame + Sized, | ||||
|               T: Into<Frame>, | ||||
|     { | ||||
|         self.into_recv_frame(frame.into()) | ||||
|     } | ||||
|  | ||||
|     fn send_frame<T>(self, frame: T) -> SendFrameFut<Self> | ||||
|         where Self: Sized, | ||||
|               T: Into<SendFrame>, | ||||
|     { | ||||
|         SendFrameFut { | ||||
|             inner: self, | ||||
|             frame: Some(frame.into()), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub struct RecvFrame<T> { | ||||
|     inner: T, | ||||
|     frame: Frame, | ||||
| } | ||||
|  | ||||
| impl<T> Future for RecvFrame<T> | ||||
|     where T: Future<Item=(Option<Frame>, Handle)>, | ||||
|           T::Error: fmt::Debug, | ||||
| { | ||||
|     type Item = Handle; | ||||
|     type Error = (); | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||
|         let (frame, handle) = match self.inner.poll().unwrap() { | ||||
|             Async::Ready((frame, handle)) => (frame, handle), | ||||
|             Async::NotReady => return Ok(Async::NotReady), | ||||
|         }; | ||||
|         assert_eq!(frame.unwrap(), self.frame); | ||||
|         Ok(Async::Ready(handle)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub struct SendFrameFut<T> { | ||||
|     inner: T, | ||||
|     frame: Option<SendFrame>, | ||||
| } | ||||
|  | ||||
| impl<T> Future for SendFrameFut<T> | ||||
|     where T: Future<Item=Handle>, | ||||
|           T::Error: fmt::Debug, | ||||
| { | ||||
|     type Item = Handle; | ||||
|     type Error = (); | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||
|         let mut handle = match self.inner.poll().unwrap() { | ||||
|             Async::Ready(handle) => handle, | ||||
|             Async::NotReady => return Ok(Async::NotReady), | ||||
|         }; | ||||
|         handle.send(self.frame.take().unwrap()).unwrap(); | ||||
|         Ok(Async::Ready(handle)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T> HandleFutureExt for T | ||||
|     where T: Future + 'static, | ||||
| { | ||||
| } | ||||
|  | ||||
| pub trait IntoRecvFrame { | ||||
|     type Future: Future; | ||||
|     fn into_recv_frame(self, frame: Frame) -> RecvFrame<Self::Future>; | ||||
| } | ||||
|  | ||||
| impl IntoRecvFrame for Handle { | ||||
|     type Future = ::futures::stream::StreamFuture<Self>; | ||||
|  | ||||
|     fn into_recv_frame(self, frame: Frame) -> RecvFrame<Self::Future> { | ||||
|         RecvFrame { | ||||
|             inner: self.into_future(), | ||||
|             frame: frame, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T> IntoRecvFrame for T | ||||
|     where T: Future<Item=Handle> + 'static, | ||||
|           T::Error: fmt::Debug, | ||||
| { | ||||
|     type Future = Box<Future<Item=(Option<Frame>, Handle), Error=()>>; | ||||
|  | ||||
|     fn into_recv_frame(self, frame: Frame) -> RecvFrame<Self::Future> { | ||||
|         let into_fut = Box::new(self.unwrap() | ||||
|             .and_then(|handle| handle.into_future().unwrap()) | ||||
|         ); | ||||
|         RecvFrame { | ||||
|             inner: into_fut, | ||||
|             frame: frame, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -7,7 +7,7 @@ pub use self::h2::client::{self, Client}; | ||||
| pub use self::h2::server::{self, Server}; | ||||
|  | ||||
| // Re-export mock | ||||
| pub use super::mock; | ||||
| pub use super::mock::{self, HandleFutureExt}; | ||||
|  | ||||
| // Re-export frames helpers | ||||
| pub use super::frames; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user