Get a request sent

This commit is contained in:
Carl Lerche
2017-06-26 22:25:25 -07:00
parent ac2959e956
commit 7897b770e9
15 changed files with 296 additions and 125 deletions

View File

@@ -13,6 +13,7 @@ pub struct Handshake<T> {
} }
/// Marker type indicating a client peer /// Marker type indicating a client peer
#[derive(Debug)]
pub struct Client; pub struct Client;
pub type Connection<T> = super::Connection<T, Client>; pub type Connection<T> = super::Connection<T, Client>;
@@ -29,9 +30,12 @@ pub fn bind<T>(io: T) -> Handshake<T>
debug!("binding client connection"); debug!("binding client connection");
let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")
.map(|(io, _)| { .then(|res| {
let (io, _) = res.unwrap();
debug!("client connection bound"); debug!("client connection bound");
proto::new_connection(io)
// Use default local settings for now
proto::Handshake::new(io, Default::default())
}) })
.map_err(ConnectionError::from); .map_err(ConnectionError::from);
@@ -55,19 +59,23 @@ impl Peer for Client {
fn convert_send_message( fn convert_send_message(
id: StreamId, id: StreamId,
message: Self::Send, headers: Self::Send,
body: bool) -> proto::SendMessage end_of_stream: bool) -> frame::Headers
{ {
use http::request::Head; use http::request::Head;
// Extract the components of the HTTP request // Extract the components of the HTTP request
let Head { method, uri, headers, .. } = message; let Head { method, uri, headers, .. } = headers;
// TODO: Ensure that the version is set to H2
// Build the set pseudo header set. All requests will include `method` // Build the set pseudo header set. All requests will include `method`
// and `path`. // and `path`.
let mut pseudo = frame::Pseudo::request(method, uri.path().into()); let mut pseudo = frame::Pseudo::request(method, uri.path().into());
// If the URI includes a scheme component, add it to the pseudo headers // If the URI includes a scheme component, add it to the pseudo headers
//
// TODO: Scheme must be set...
if let Some(scheme) = uri.scheme() { if let Some(scheme) = uri.scheme() {
pseudo.set_scheme(scheme.into()); pseudo.set_scheme(scheme.into());
} }
@@ -81,18 +89,14 @@ impl Peer for Client {
// Create the HEADERS frame // Create the HEADERS frame
let mut frame = frame::Headers::new(id, pseudo, headers); let mut frame = frame::Headers::new(id, pseudo, headers);
// TODO: Factor in trailers if end_of_stream {
if !body { frame.set_end_stream()
// frame.set_end_stream();
} else {
unimplemented!();
} }
// Return the `SendMessage` frame
proto::SendMessage::new(frame)
} }
fn convert_poll_message(message: proto::PollMessage) -> Frame<Self::Poll> { fn convert_poll_message(headers: frame::Headers) -> Frame<Self::Poll> {
unimplemented!(); unimplemented!();
} }
} }

26
src/frame/go_away.rs Normal file
View File

@@ -0,0 +1,26 @@
use frame::Error;
use super::{head, StreamId};
#[derive(Debug)]
pub struct GoAway {
last_stream_id: StreamId,
error_code: u32,
}
impl GoAway {
pub fn load(payload: &[u8]) -> Result<GoAway, Error> {
if payload.len() < 8 {
// Invalid payload len
// TODO: Handle error
unimplemented!();
}
let last_stream_id = head::parse_stream_id(&payload[..4]);
let error_code = unpack_octets_4!(payload, 4, u32);
Ok(GoAway {
last_stream_id: last_stream_id,
error_code: error_code,
})
}
}

View File

@@ -81,7 +81,8 @@ impl Head {
/// octet is ignored and the rest interpreted as a network-endian 31-bit /// octet is ignored and the rest interpreted as a network-endian 31-bit
/// integer. /// integer.
#[inline] #[inline]
fn parse_stream_id(buf: &[u8]) -> StreamId { pub fn parse_stream_id(buf: &[u8]) -> StreamId {
/// TODO: Move this onto the StreamId type?
let unpacked = unpack_octets_4!(buf, 0, u32); let unpacked = unpack_octets_4!(buf, 0, u32);
// Now clear the most significant bit, as that is reserved and MUST be ignored when received. // Now clear the most significant bit, as that is reserved and MUST be ignored when received.
unpacked & !STREAM_ID_MASK unpacked & !STREAM_ID_MASK

View File

@@ -176,6 +176,10 @@ impl Headers {
self.flags.is_end_headers() self.flags.is_end_headers()
} }
pub fn set_end_stream(&mut self) {
self.flags.set_end_stream()
}
pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut) pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut)
-> Option<Continuation> -> Option<Continuation>
{ {
@@ -208,13 +212,13 @@ impl Headers {
let len = (dst.len() - pos) - frame::HEADER_LEN; let len = (dst.len() - pos) - frame::HEADER_LEN;
// Write the frame length // Write the frame length
BigEndian::write_u32(&mut dst[pos..pos+3], len as u32); BigEndian::write_uint(&mut dst[pos..pos+3], len as u64, 3);
ret ret
} }
fn head(&self) -> Head { fn head(&self) -> Head {
Head::new(Kind::Data, self.flags.into(), self.stream_id) Head::new(Kind::Headers, self.flags.into(), self.stream_id)
} }
} }

View File

@@ -18,6 +18,7 @@ use std::io;
/// ``` /// ```
#[macro_escape] #[macro_escape]
macro_rules! unpack_octets_4 { macro_rules! unpack_octets_4 {
// TODO: Get rid of this macro
($buf:expr, $offset:expr, $tip:ty) => ( ($buf:expr, $offset:expr, $tip:ty) => (
(($buf[$offset + 0] as $tip) << 24) | (($buf[$offset + 0] as $tip) << 24) |
(($buf[$offset + 1] as $tip) << 16) | (($buf[$offset + 1] as $tip) << 16) |
@@ -27,12 +28,14 @@ macro_rules! unpack_octets_4 {
} }
mod data; mod data;
mod go_away;
mod head; mod head;
mod headers; mod headers;
mod settings; mod settings;
mod util; mod util;
pub use self::data::Data; pub use self::data::Data;
pub use self::go_away::GoAway;
pub use self::head::{Head, Kind, StreamId}; pub use self::head::{Head, Kind, StreamId};
pub use self::headers::{Headers, PushPromise, Continuation, Pseudo}; pub use self::headers::{Headers, PushPromise, Continuation, Pseudo};
pub use self::settings::{Settings, SettingSet}; pub use self::settings::{Settings, SettingSet};

View File

@@ -22,6 +22,7 @@ pub struct SettingSet {
/// frame. /// frame.
/// ///
/// Each setting has a value that is a 32 bit unsigned integer (6.5.1.). /// Each setting has a value that is a 32 bit unsigned integer (6.5.1.).
#[derive(Debug)]
pub enum Setting { pub enum Setting {
HeaderTableSize(u32), HeaderTableSize(u32),
EnablePush(u32), EnablePush(u32),
@@ -134,10 +135,15 @@ impl Settings {
let head = Head::new(Kind::Settings, self.flags.into(), 0); let head = Head::new(Kind::Settings, self.flags.into(), 0);
let payload_len = self.payload_len(); let payload_len = self.payload_len();
trace!("encoding SETTINGS; len={}", payload_len);
head.encode(payload_len, dst); head.encode(payload_len, dst);
// Encode the settings // Encode the settings
self.for_each(|setting| setting.encode(dst)); self.for_each(|setting| {
trace!("encoding setting; val={:?}", setting);
setting.encode(dst)
});
} }
fn for_each<F: FnMut(Setting)>(&self, mut f: F) { fn for_each<F: FnMut(Setting)>(&self, mut f: F) {

View File

@@ -10,6 +10,7 @@ use std::io::Cursor;
use std::collections::VecDeque; use std::collections::VecDeque;
/// Decodes headers using HPACK /// Decodes headers using HPACK
#[derive(Debug)]
pub struct Decoder { pub struct Decoder {
// Protocol indicated that the max table size will update // Protocol indicated that the max table size will update
max_size_update: Option<usize>, max_size_update: Option<usize>,
@@ -127,6 +128,7 @@ enum Representation {
SizeUpdate, SizeUpdate,
} }
#[derive(Debug)]
struct Table { struct Table {
entries: VecDeque<Header>, entries: VecDeque<Header>,
size: usize, size: usize,

View File

@@ -1,5 +1,6 @@
#![allow(warnings)] #![allow(warnings)]
#[macro_use]
extern crate futures; extern crate futures;
#[macro_use] #[macro_use]
@@ -38,19 +39,28 @@ pub use proto::Connection;
/// An H2 connection frame /// An H2 connection frame
#[derive(Debug)] #[derive(Debug)]
pub enum Frame<T> { pub enum Frame<T> {
Message { Headers {
id: StreamId, id: StreamId,
message: T, headers: T,
body: bool, end_of_stream: bool,
}, },
Body { Body {
id: StreamId, id: StreamId,
chunk: Option<()>, chunk: (),
end_of_stream: bool,
},
Trailers {
id: StreamId,
headers: (),
},
PushPromise {
id: StreamId,
promise: (),
}, },
Error { Error {
id: StreamId, id: StreamId,
error: (), error: (),
} },
} }
/// Either a Client or a Server /// Either a Client or a Server
@@ -66,9 +76,9 @@ pub trait Peer {
#[doc(hidden)] #[doc(hidden)]
fn convert_send_message( fn convert_send_message(
id: StreamId, id: StreamId,
message: Self::Send, headers: Self::Send,
body: bool) -> proto::SendMessage; end_of_stream: bool) -> frame::Headers;
#[doc(hidden)] #[doc(hidden)]
fn convert_poll_message(message: proto::PollMessage) -> Frame<Self::Poll>; fn convert_poll_message(headers: frame::Headers) -> Frame<Self::Poll>;
} }

View File

@@ -1,10 +1,10 @@
use {frame, Frame, ConnectionError, Peer, StreamId}; use {frame, Frame, ConnectionError, Peer, StreamId};
use client::Client;
use proto::{self, ReadySink, State}; use proto::{self, ReadySink, State};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
use http; use http::{self, request, response};
use futures::*; use futures::*;
@@ -15,74 +15,28 @@ use std::marker::PhantomData;
use std::hash::BuildHasherDefault; use std::hash::BuildHasherDefault;
/// An H2 connection /// An H2 connection
#[derive(Debug)]
pub struct Connection<T, P> { pub struct Connection<T, P> {
inner: Inner<T>, inner: proto::Inner<T>,
streams: StreamMap<State>, streams: StreamMap<State>,
peer: PhantomData<P>, peer: PhantomData<P>,
} }
type Inner<T> = impl<T, P> From<proto::Inner<T>> for Connection<T, P>
proto::Settings<
proto::PingPong<
proto::FramedWrite<
proto::FramedRead<
length_delimited::FramedRead<T>>>>>;
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
/// Returns a new `Connection` backed by the given `io`.
pub fn new<T, P>(io: T) -> Connection<T, P>
where T: AsyncRead + AsyncWrite, where T: AsyncRead + AsyncWrite,
P: Peer, P: Peer,
{ {
fn from(src: proto::Inner<T>) -> Self {
// Delimit the frames Connection {
let framed_read = length_delimited::Builder::new() inner: src,
.big_endian() streams: StreamMap::default(),
.length_field_length(3) peer: PhantomData,
.length_adjustment(9) }
.num_skip(0) // Don't skip the header
.new_read(io);
// Map to `Frame` types
let framed_read = proto::FramedRead::new(framed_read);
// Frame encoder
let mut framed = proto::FramedWrite::new(framed_read);
// Ok, so this is a **little** hacky, but it works for now.
//
// The ping/pong behavior SHOULD be given highest priority (6.7).
// However, the connection handshake requires the settings frame to be
// sent as the very first one. This needs special handling because
// otherwise there is a race condition where the peer could send its
// settings frame followed immediately by a Ping, in which case, we
// don't want to accidentally send the pong before finishing the
// connection hand shake.
//
// So, to ensure correct ordering, we write the settings frame here
// before fully constructing the connection struct. Technically, `Async`
// operations should not be performed in `new` because this might not
// happen on a task, however we have full control of the I/O and we know
// that the settings frame will get buffered and not actually perform an
// I/O op.
let initial_settings = frame::SettingSet::default();
let frame = frame::Settings::new(initial_settings.clone());
assert!(framed.start_send(frame.into()).unwrap().is_ready());
// Add ping/pong handler
let ping_pong = proto::PingPong::new(framed);
// Add settings handler
let connection = proto::Settings::new(ping_pong, initial_settings);
Connection {
inner: connection,
streams: StreamMap::default(),
peer: PhantomData,
} }
} }
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
impl<T, P> Connection<T, P> impl<T, P> Connection<T, P>
where T: AsyncRead + AsyncWrite, where T: AsyncRead + AsyncWrite,
P: Peer, P: Peer,
@@ -94,6 +48,23 @@ impl<T, P> Connection<T, P>
} }
} }
impl<T> Connection<T, Client>
where T: AsyncRead + AsyncWrite,
{
pub fn send_request(self,
id: StreamId, // TODO: Generate one internally?
request: request::Head,
end_of_stream: bool)
-> sink::Send<Self>
{
self.send(Frame::Headers {
id: id,
headers: request,
end_of_stream: end_of_stream,
})
}
}
impl<T, P> Stream for Connection<T, P> impl<T, P> Stream for Connection<T, P>
where T: AsyncRead + AsyncWrite, where T: AsyncRead + AsyncWrite,
P: Peer, P: Peer,
@@ -104,8 +75,15 @@ impl<T, P> Stream for Connection<T, P>
fn poll(&mut self) -> Poll<Option<Self::Item>, ConnectionError> { fn poll(&mut self) -> Poll<Option<Self::Item>, ConnectionError> {
use frame::Frame::*; use frame::Frame::*;
// Because receiving new frames may depend on ensuring that the write
// buffer is clear, `poll_complete` is called here.
let _ = try!(self.poll_complete());
match try_ready!(self.inner.poll()) { match try_ready!(self.inner.poll()) {
Some(Headers(v)) => unimplemented!(), Some(Headers(v)) => {
debug!("poll; frame={:?}", v);
unimplemented!();
}
Some(frame) => panic!("unexpected frame; frame={:?}", frame), Some(frame) => panic!("unexpected frame; frame={:?}", frame),
None => return Ok(Async::Ready(None)), None => return Ok(Async::Ready(None)),
_ => unimplemented!(), _ => unimplemented!(),
@@ -129,7 +107,7 @@ impl<T, P> Sink for Connection<T, P>
} }
match item { match item {
Frame::Message { id, message, body } => { Frame::Headers { id, headers, end_of_stream } => {
// Ensure ID is valid // Ensure ID is valid
try!(P::check_initiating_id(id)); try!(P::check_initiating_id(id));
@@ -138,17 +116,18 @@ impl<T, P> Sink for Connection<T, P>
// connections should not be factored. // connections should not be factored.
// Transition the stream state, creating a new entry if needed // Transition the stream state, creating a new entry if needed
//
// TODO: Response can send multiple headers frames before body
// (1xx responses).
try!(self.streams.entry(id) try!(self.streams.entry(id)
.or_insert(State::default()) .or_insert(State::default())
.send_headers()); .send_headers());
let message = P::convert_send_message(id, message, body); let frame = P::convert_send_message(id, headers, end_of_stream);
// TODO: Handle trailers and all that jazz
// We already ensured that the upstream can handle the frame, so // We already ensured that the upstream can handle the frame, so
// panic if it gets rejected. // panic if it gets rejected.
let res = try!(self.inner.start_send(frame::Frame::Headers(message.frame))); let res = try!(self.inner.start_send(frame::Frame::Headers(frame)));
// This is a one-way conversion. By checking `poll_ready` first, // This is a one-way conversion. By checking `poll_ready` first,
// it's already been determined that the inner `Sink` can accept // it's already been determined that the inner `Sink` can accept
@@ -157,7 +136,13 @@ impl<T, P> Sink for Connection<T, P>
Ok(AsyncSink::Ready) Ok(AsyncSink::Ready)
} }
Frame::Body { id, chunk } => { Frame::Trailers { id, headers } => {
unimplemented!();
}
Frame::Body { id, chunk, end_of_stream } => {
unimplemented!();
}
Frame::PushPromise { id, promise } => {
unimplemented!(); unimplemented!();
} }
Frame::Error { id, error } => { Frame::Error { id, error } => {

View File

@@ -9,6 +9,7 @@ use bytes::{Bytes, BytesMut, Buf};
use std::io::{self, Write, Cursor}; use std::io::{self, Write, Cursor};
#[derive(Debug)]
pub struct FramedRead<T> { pub struct FramedRead<T> {
inner: T, inner: T,
@@ -19,6 +20,7 @@ pub struct FramedRead<T> {
} }
/// Partially loaded headers frame /// Partially loaded headers frame
#[derive(Debug)]
enum Partial { enum Partial {
Headers(frame::Headers), Headers(frame::Headers),
PushPromise(frame::PushPromise), PushPromise(frame::PushPromise),
@@ -71,9 +73,15 @@ impl<T> FramedRead<T> {
} }
Kind::PushPromise => unimplemented!(), Kind::PushPromise => unimplemented!(),
Kind::Ping => unimplemented!(), Kind::Ping => unimplemented!(),
Kind::GoAway => unimplemented!(), Kind::GoAway => {
let frame = try!(frame::GoAway::load(&bytes[frame::HEADER_LEN..]));
debug!("decoded; frame={:?}", frame);
unimplemented!();
}
Kind::WindowUpdate => unimplemented!(), Kind::WindowUpdate => unimplemented!(),
Kind::Continuation => unimplemented!(), Kind::Continuation => {
unimplemented!();
}
Kind::Unknown => return Ok(None), Kind::Unknown => return Ok(None),
}; };

131
src/proto/handshake.rs Normal file
View File

@@ -0,0 +1,131 @@
use {ConnectionError, Peer};
use frame::{self, Frame};
use proto::{self, Connection};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
use futures::{Future, Sink, Stream, Poll, Async, AsyncSink};
use std::marker::PhantomData;
/// Implements the settings component of the initial H2 handshake
pub struct Handshake<T, P> {
// Upstream transport
inner: Option<Inner<T>>,
// True when the local settings have been sent
settings_sent: bool,
// Peer
peer: PhantomData<P>,
}
struct Inner<T> {
// Upstream transport
framed: proto::Framed<T>,
// Our settings
local: frame::SettingSet,
}
impl<T, P> Handshake<T, P>
where T: AsyncRead + AsyncWrite,
{
/// Initiate an HTTP/2.0 handshake.
pub fn new(io: T, local: frame::SettingSet) -> Self {
// Delimit the frames
let framed_read = length_delimited::Builder::new()
.big_endian()
.length_field_length(3)
.length_adjustment(9)
.num_skip(0) // Don't skip the header
.new_read(io);
// Map to `Frame` types
let framed_read = proto::FramedRead::new(framed_read);
// Frame encoder
let mut framed = proto::FramedWrite::new(framed_read);
Handshake {
inner: Some(Inner {
framed: framed,
local: local,
}),
settings_sent: false,
peer: PhantomData,
}
}
/// Returns a reference to the local settings.
///
/// # Panics
///
/// Panics if `HandshakeInner` has already been consumed.
fn local(&self) -> &frame::SettingSet {
&self.inner.as_ref().unwrap().local
}
/// Returns a mutable reference to `HandshakeInner`.
///
/// # Panics
///
/// Panics if `HandshakeInner` has already been consumed.
fn inner_mut(&mut self) -> &mut proto::Framed<T> {
&mut self.inner.as_mut().unwrap().framed
}
}
// Either a client or server. satisfied when we have sent a SETTINGS frame and
// have sent an ACK for the remote's settings.
impl<T, P> Future for Handshake<T, P>
where T: AsyncRead + AsyncWrite,
P: Peer,
{
type Item = Connection<T, P>;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if !self.settings_sent {
let frame = frame::Settings::new(self.local().clone()).into();
if let AsyncSink::NotReady(_) = try!(self.inner_mut().start_send(frame)) {
// This shouldn't really happen, but if it does, try again
// later.
return Ok(Async::NotReady);
}
// Try flushing...
try!(self.inner_mut().poll_complete());
self.settings_sent = true;
}
match try_ready!(self.inner_mut().poll()) {
Some(Frame::Settings(v)) => {
if v.is_ack() {
// TODO: unexpected ACK, protocol error
unimplemented!();
} else {
let remote = v.into_set();
let inner = self.inner.take().unwrap();
// Add ping/pong handler
let ping_pong = proto::PingPong::new(inner.framed);
// Add settings handler
let settings = proto::Settings::new(
ping_pong, inner.local, remote);
// Finally, convert to the `Connection`
let connection = settings.into();
return Ok(Async::Ready(connection));
}
}
// TODO: handle handshake failure
_ => unimplemented!(),
}
}
}

View File

@@ -1,35 +1,30 @@
mod connection; mod connection;
mod framed_read; mod framed_read;
mod framed_write; mod framed_write;
mod handshake;
mod ping_pong; mod ping_pong;
mod ready; mod ready;
mod settings; mod settings;
mod state; mod state;
pub use self::connection::{Connection, new as new_connection}; pub use self::connection::{Connection};
pub use self::framed_read::FramedRead; pub use self::framed_read::FramedRead;
pub use self::framed_write::FramedWrite; pub use self::framed_write::FramedWrite;
pub use self::handshake::Handshake;
pub use self::ping_pong::PingPong; pub use self::ping_pong::PingPong;
pub use self::ready::ReadySink; pub use self::ready::ReadySink;
pub use self::settings::Settings; pub use self::settings::Settings;
pub use self::state::State; pub use self::state::State;
use frame; use tokio_io::codec::length_delimited;
/// A request or response issued by the current process. /// Base HTTP/2.0 transport. Only handles framing.
pub struct SendMessage { type Framed<T> =
frame: frame::Headers, FramedWrite<
} FramedRead<
length_delimited::FramedRead<T>>>;
/// A request or response received by the current process. type Inner<T> =
pub struct PollMessage { Settings<
frame: frame::Headers, PingPong<
} Framed<T>>>;
impl SendMessage {
pub fn new(frame: frame::Headers) -> Self {
SendMessage {
frame: frame,
}
}
}

View File

@@ -4,6 +4,7 @@ use proto::ReadySink;
use futures::*; use futures::*;
#[derive(Debug)]
pub struct PingPong<T> { pub struct PingPong<T> {
inner: T, inner: T,
} }

View File

@@ -4,6 +4,7 @@ use proto::ReadySink;
use futures::*; use futures::*;
#[derive(Debug)]
pub struct Settings<T> { pub struct Settings<T> {
// Upstream transport // Upstream transport
inner: T, inner: T,
@@ -21,22 +22,17 @@ pub struct Settings<T> {
is_dirty: bool, is_dirty: bool,
} }
/*
* TODO:
* - Settings ack timeout for connection error
*/
impl<T> Settings<T> impl<T> Settings<T>
where T: Stream<Item = Frame, Error = ConnectionError>, where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame, SinkError = ConnectionError>, T: Sink<SinkItem = Frame, SinkError = ConnectionError>,
{ {
pub fn new(inner: T, local: frame::SettingSet) -> Settings<T> { pub fn new(inner: T, local: frame::SettingSet, remote: frame::SettingSet) -> Settings<T> {
Settings { Settings {
inner: inner, inner: inner,
local: local, local: local,
remote: frame::SettingSet::default(), remote: remote,
remaining_acks: 0, remaining_acks: 1,
is_dirty: true, is_dirty: false,
} }
} }
@@ -60,7 +56,8 @@ impl<T> Settings<T>
fn try_send(&mut self, item: frame::Settings) -> Poll<(), ConnectionError> { fn try_send(&mut self, item: frame::Settings) -> Poll<(), ConnectionError> {
if let AsyncSink::NotReady(_) = try!(self.inner.start_send(item.into())) { if let AsyncSink::NotReady(_) = try!(self.inner.start_send(item.into())) {
// Ensure that call to `poll_complete` guarantee is called to satisfied // TODO: I don't think this is needed actually... It was originally
// done to "satisfy the start_send" contract...
try!(self.inner.poll_complete()); try!(self.inner.poll_complete());
return Ok(Async::NotReady); return Ok(Async::NotReady);
@@ -117,14 +114,12 @@ impl<T> Sink for Settings<T>
} }
fn poll_complete(&mut self) -> Poll<(), ConnectionError> { fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
try_ready!(self.try_send_pending());
self.inner.poll_complete() self.inner.poll_complete()
} }
fn close(&mut self) -> Poll<(), ConnectionError> { fn close(&mut self) -> Poll<(), ConnectionError> {
if !try!(self.try_send_pending()).is_ready() { try_ready!(self.try_send_pending());
return Ok(Async::NotReady);
}
self.inner.close() self.inner.close()
} }
} }

View File