Skip to content

Commit

Permalink
sync: implement oneshot::Receiver::is_terminated() (#7152)
Browse files Browse the repository at this point in the history
  • Loading branch information
cratelyn authored Feb 16, 2025
1 parent aa70f6c commit 17117b5
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 3 deletions.
64 changes: 61 additions & 3 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,64 @@ impl<T> Receiver<T> {
}
}

/// Checks if this receiver is terminated.
///
/// This function returns true if this receiver has already yielded a [`Poll::Ready`] result.
/// If so, this receiver should no longer be polled.
///
/// # Examples
///
/// Sending a value and polling it.
///
/// ```
/// use tokio::sync::oneshot;
///
/// use std::task::Poll;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel();
///
/// // A receiver is not terminated when it is initialized.
/// assert!(!rx.is_terminated());
///
/// // A receiver is not terminated it is polled and is still pending.
/// let poll = futures::poll!(&mut rx);
/// assert_eq!(poll, Poll::Pending);
/// assert!(!rx.is_terminated());
///
/// // A receiver is not terminated if a value has been sent, but not yet read.
/// tx.send(0).unwrap();
/// assert!(!rx.is_terminated());
///
/// // A receiver *is* terminated after it has been polled and yielded a value.
/// assert_eq!((&mut rx).await, Ok(0));
/// assert!(rx.is_terminated());
/// }
/// ```
///
/// Dropping the sender.
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel::<()>();
///
/// // A receiver is not immediately terminated when the sender is dropped.
/// drop(tx);
/// assert!(!rx.is_terminated());
///
/// // A receiver *is* terminated after it has been polled and yielded an error.
/// let _ = (&mut rx).await.unwrap_err();
/// assert!(rx.is_terminated());
/// }
/// ```
pub fn is_terminated(&self) -> bool {
self.inner.is_none()
}

/// Attempts to receive a value.
///
/// If a pending value exists in the channel, it is returned. If no value
Expand Down Expand Up @@ -1106,18 +1164,18 @@ impl<T> Future for Receiver<T> {

let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
let res = ready!(inner.poll_recv(cx))?;
let res = ready!(inner.poll_recv(cx)).map_err(Into::into);

res
} else {
panic!("called after complete");
};

self.inner = None;
Ready(Ok(ret))
Ready(ret)
}
}

Expand Down
94 changes: 94 additions & 0 deletions tokio/tests/sync_oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,97 @@ fn sender_changes_task() {

assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
}

#[test]
fn receiver_is_terminated_send() {
let (tx, mut rx) = oneshot::channel::<i32>();

assert!(
!rx.is_terminated(),
"channel is NOT terminated before value is sent"
);
tx.send(17).unwrap();
assert!(
!rx.is_terminated(),
"channel is NOT terminated after value is sent"
);

let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_eq!(poll, Ok(17));

assert!(
rx.is_terminated(),
"channel IS terminated after value is read"
);
}

#[test]
fn receiver_is_terminated_try_recv() {
let (tx, mut rx) = oneshot::channel::<i32>();

assert!(
!rx.is_terminated(),
"channel is NOT terminated before value is sent"
);
tx.send(17).unwrap();
assert!(
!rx.is_terminated(),
"channel is NOT terminated after value is sent"
);

let value = rx.try_recv().expect("value is waiting");
assert_eq!(value, 17);

assert!(
rx.is_terminated(),
"channel IS terminated after value is read"
);
}

#[test]
fn receiver_is_terminated_drop() {
let (tx, mut rx) = oneshot::channel::<i32>();

assert!(
!rx.is_terminated(),
"channel is NOT terminated before sender is dropped"
);
drop(tx);
assert!(
!rx.is_terminated(),
"channel is NOT terminated after sender is dropped"
);

let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_err!(poll);

assert!(
rx.is_terminated(),
"channel IS terminated after value is read"
);
}

#[test]
fn receiver_is_terminated_rx_close() {
let (_tx, mut rx) = oneshot::channel::<i32>();
assert!(
!rx.is_terminated(),
"channel is NOT terminated before closing"
);
rx.close();
assert!(
!rx.is_terminated(),
"channel is NOT terminated before closing"
);

let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_err!(poll);

assert!(
rx.is_terminated(),
"channel IS terminated after value is read"
);
}

0 comments on commit 17117b5

Please sign in to comment.