refactor(lib): Remove useless uses of Pin (#2405)
This commit is contained in:
@@ -138,25 +138,22 @@ impl<T, U> Clone for UnboundedSender<T, U> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project::pin_project(PinnedDrop)]
|
|
||||||
pub(crate) struct Receiver<T, U> {
|
pub(crate) struct Receiver<T, U> {
|
||||||
#[pin]
|
|
||||||
inner: mpsc::UnboundedReceiver<Envelope<T, U>>,
|
inner: mpsc::UnboundedReceiver<Envelope<T, U>>,
|
||||||
taker: want::Taker,
|
taker: want::Taker,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, U> Receiver<T, U> {
|
impl<T, U> Receiver<T, U> {
|
||||||
pub(crate) fn poll_next(
|
pub(crate) fn poll_recv(
|
||||||
self: Pin<&mut Self>,
|
&mut self,
|
||||||
cx: &mut task::Context<'_>,
|
cx: &mut task::Context<'_>,
|
||||||
) -> Poll<Option<(T, Callback<T, U>)>> {
|
) -> Poll<Option<(T, Callback<T, U>)>> {
|
||||||
let mut this = self.project();
|
match self.inner.poll_recv(cx) {
|
||||||
match this.inner.poll_recv(cx) {
|
|
||||||
Poll::Ready(item) => {
|
Poll::Ready(item) => {
|
||||||
Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
|
Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
|
||||||
}
|
}
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
this.taker.want();
|
self.taker.want();
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -177,12 +174,11 @@ impl<T, U> Receiver<T, U> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project::pinned_drop]
|
impl<T, U> Drop for Receiver<T, U> {
|
||||||
impl<T, U> PinnedDrop for Receiver<T, U> {
|
fn drop(&mut self) {
|
||||||
fn drop(mut self: Pin<&mut Self>) {
|
|
||||||
// Notify the giver about the closure first, before dropping
|
// Notify the giver about the closure first, before dropping
|
||||||
// the mpsc::Receiver.
|
// the mpsc::Receiver.
|
||||||
self.as_mut().taker.cancel();
|
self.taker.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -279,8 +275,8 @@ mod tests {
|
|||||||
impl<T, U> Future for Receiver<T, U> {
|
impl<T, U> Future for Receiver<T, U> {
|
||||||
type Output = Option<(T, Callback<T, U>)>;
|
type Output = Option<(T, Callback<T, U>)>;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
self.poll_next(cx)
|
self.poll_recv(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -44,10 +44,8 @@ cfg_server! {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cfg_client! {
|
cfg_client! {
|
||||||
#[pin_project::pin_project]
|
|
||||||
pub(crate) struct Client<B> {
|
pub(crate) struct Client<B> {
|
||||||
callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
|
callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
|
||||||
#[pin]
|
|
||||||
rx: ClientRx<B>,
|
rx: ClientRx<B>,
|
||||||
rx_closed: bool,
|
rx_closed: bool,
|
||||||
}
|
}
|
||||||
@@ -557,12 +555,12 @@ cfg_client! {
|
|||||||
type RecvItem = crate::proto::ResponseHead;
|
type RecvItem = crate::proto::ResponseHead;
|
||||||
|
|
||||||
fn poll_msg(
|
fn poll_msg(
|
||||||
self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut task::Context<'_>,
|
cx: &mut task::Context<'_>,
|
||||||
) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), crate::common::Never>>> {
|
) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), crate::common::Never>>> {
|
||||||
let this = self.project();
|
let mut this = self.as_mut();
|
||||||
debug_assert!(!*this.rx_closed);
|
debug_assert!(!this.rx_closed);
|
||||||
match this.rx.poll_next(cx) {
|
match this.rx.poll_recv(cx) {
|
||||||
Poll::Ready(Some((req, mut cb))) => {
|
Poll::Ready(Some((req, mut cb))) => {
|
||||||
// check that future hasn't been canceled already
|
// check that future hasn't been canceled already
|
||||||
match cb.poll_canceled(cx) {
|
match cb.poll_canceled(cx) {
|
||||||
@@ -578,7 +576,7 @@ cfg_client! {
|
|||||||
headers: parts.headers,
|
headers: parts.headers,
|
||||||
extensions: parts.extensions,
|
extensions: parts.extensions,
|
||||||
};
|
};
|
||||||
*this.callback = Some(cb);
|
this.callback = Some(cb);
|
||||||
Poll::Ready(Some(Ok((head, body))))
|
Poll::Ready(Some(Ok((head, body))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -586,7 +584,7 @@ cfg_client! {
|
|||||||
Poll::Ready(None) => {
|
Poll::Ready(None) => {
|
||||||
// user has dropped sender handle
|
// user has dropped sender handle
|
||||||
trace!("client tx closed");
|
trace!("client tx closed");
|
||||||
*this.rx_closed = true;
|
this.rx_closed = true;
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
}
|
}
|
||||||
Poll::Pending => Poll::Pending,
|
Poll::Pending => Poll::Pending,
|
||||||
|
|||||||
@@ -213,7 +213,7 @@ where
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
match Pin::new(&mut self.req_rx).poll_next(cx) {
|
match self.req_rx.poll_recv(cx) {
|
||||||
Poll::Ready(Some((req, cb))) => {
|
Poll::Ready(Some((req, cb))) => {
|
||||||
// check that future hasn't been canceled already
|
// check that future hasn't been canceled already
|
||||||
if cb.is_canceled() {
|
if cb.is_canceled() {
|
||||||
|
|||||||
Reference in New Issue
Block a user