summaryrefslogtreecommitdiff
path: root/src/libstd/sync/mpsc
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2020-07-27 17:39:01 +0000
committerbors <bors@rust-lang.org>2020-07-27 17:39:01 +0000
commit54e000891ffccd4cbfb92146b92736c83085df63 (patch)
tree1200bb13eb9ae22def4c43bc657bc56da8faedc6 /src/libstd/sync/mpsc
parent4a90e36c85336d1d4b209556c1a9733210bbff19 (diff)
parent6d9705220fec4553d693a7c19d99496e14c89edf (diff)
downloadrust-tmp-nightly.tar.gz
Auto merge of #73265 - mark-i-m:mv-std, r=<try>tmp-nightly
mv std libs to library/ This is the first step in refactoring the directory layout of this repository, with further followup steps planned (but not done yet). Background: currently, all crates are under src/, without nested src directories and with the unconventional `lib*` prefixes (e.g., `src/libcore/lib.rs`). This directory structures is not idiomatic and makes the `src/` directory rather overwhelming. To improve contributor experience and make things a bit more approachable, we are reorganizing the repo a bit. In this PR, we move the standard libs (basically anything that is "runtime", as opposed to part of the compiler, build system, or one of the tools, etc). The new layout moves these libraries to a new `library/` directory in the root of the repo. Additionally, we remove the `lib*` prefixes and add nested `src/` directories. The other crates/tools in this repo are not touched. So in summary: ``` library/<crate>/src/*.rs src/<all the rest> // unchanged ``` where `<crate>` is: - core - alloc - std - test - proc_macro - panic_abort - panic_unwind - profiler_builtins - term - unwind - rtstartup - backtrace - rustc-std-workspace-* There was a lot of discussion about this and a few rounds of compiler team approvals, FCPs, MCPs, and nominations. The original MCP is https://github.com/rust-lang/compiler-team/issues/298. The final approval of the compiler team was given here: https://github.com/rust-lang/rust/pull/73265#issuecomment-659498446. The name `library` was chosen to complement a later move of the compiler crates to a `compiler/` directory. There was a lot of discussion around adding the nested `src/` directories. Note that this does increase the nesting depth (plausibly important for manual traversal of the tree, e.g., through GitHub's UI or `cd`), but this is deemed to be better as it fits the standard layout of Rust crates throughout most of the ecosystem, though there is some debate about how much this should apply to multi-crate projects. Overall, there seem to be more people in favor of nested `src/` than against. After this PR, there are no dependencies out of the `library/` directory except on the `build_helper` (or crates.io crates).
Diffstat (limited to 'src/libstd/sync/mpsc')
-rw-r--r--src/libstd/sync/mpsc/blocking.rs79
-rw-r--r--src/libstd/sync/mpsc/cache_aligned.rs27
-rw-r--r--src/libstd/sync/mpsc/mod.rs3033
-rw-r--r--src/libstd/sync/mpsc/mpsc_queue.rs165
-rw-r--r--src/libstd/sync/mpsc/oneshot.rs307
-rw-r--r--src/libstd/sync/mpsc/shared.rs489
-rw-r--r--src/libstd/sync/mpsc/spsc_queue.rs338
-rw-r--r--src/libstd/sync/mpsc/stream.rs453
-rw-r--r--src/libstd/sync/mpsc/sync.rs495
9 files changed, 0 insertions, 5386 deletions
diff --git a/src/libstd/sync/mpsc/blocking.rs b/src/libstd/sync/mpsc/blocking.rs
deleted file mode 100644
index d34de6a4fac..00000000000
--- a/src/libstd/sync/mpsc/blocking.rs
+++ /dev/null
@@ -1,79 +0,0 @@
-//! Generic support for building blocking abstractions.
-
-use crate::mem;
-use crate::sync::atomic::{AtomicBool, Ordering};
-use crate::sync::Arc;
-use crate::thread::{self, Thread};
-use crate::time::Instant;
-
-struct Inner {
- thread: Thread,
- woken: AtomicBool,
-}
-
-unsafe impl Send for Inner {}
-unsafe impl Sync for Inner {}
-
-#[derive(Clone)]
-pub struct SignalToken {
- inner: Arc<Inner>,
-}
-
-pub struct WaitToken {
- inner: Arc<Inner>,
-}
-
-impl !Send for WaitToken {}
-
-impl !Sync for WaitToken {}
-
-pub fn tokens() -> (WaitToken, SignalToken) {
- let inner = Arc::new(Inner { thread: thread::current(), woken: AtomicBool::new(false) });
- let wait_token = WaitToken { inner: inner.clone() };
- let signal_token = SignalToken { inner };
- (wait_token, signal_token)
-}
-
-impl SignalToken {
- pub fn signal(&self) -> bool {
- let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst);
- if wake {
- self.inner.thread.unpark();
- }
- wake
- }
-
- /// Converts to an unsafe usize value. Useful for storing in a pipe's state
- /// flag.
- #[inline]
- pub unsafe fn cast_to_usize(self) -> usize {
- mem::transmute(self.inner)
- }
-
- /// Converts from an unsafe usize value. Useful for retrieving a pipe's state
- /// flag.
- #[inline]
- pub unsafe fn cast_from_usize(signal_ptr: usize) -> SignalToken {
- SignalToken { inner: mem::transmute(signal_ptr) }
- }
-}
-
-impl WaitToken {
- pub fn wait(self) {
- while !self.inner.woken.load(Ordering::SeqCst) {
- thread::park()
- }
- }
-
- /// Returns `true` if we wake up normally.
- pub fn wait_max_until(self, end: Instant) -> bool {
- while !self.inner.woken.load(Ordering::SeqCst) {
- let now = Instant::now();
- if now >= end {
- return false;
- }
- thread::park_timeout(end - now)
- }
- true
- }
-}
diff --git a/src/libstd/sync/mpsc/cache_aligned.rs b/src/libstd/sync/mpsc/cache_aligned.rs
deleted file mode 100644
index b0842144328..00000000000
--- a/src/libstd/sync/mpsc/cache_aligned.rs
+++ /dev/null
@@ -1,27 +0,0 @@
-use crate::ops::{Deref, DerefMut};
-
-#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
-#[repr(align(64))]
-pub(super) struct Aligner;
-
-#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
-pub(super) struct CacheAligned<T>(pub T, pub Aligner);
-
-impl<T> Deref for CacheAligned<T> {
- type Target = T;
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
-impl<T> DerefMut for CacheAligned<T> {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
- }
-}
-
-impl<T> CacheAligned<T> {
- pub(super) fn new(t: T) -> Self {
- CacheAligned(t, Aligner)
- }
-}
diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs
deleted file mode 100644
index 3ff50e9f213..00000000000
--- a/src/libstd/sync/mpsc/mod.rs
+++ /dev/null
@@ -1,3033 +0,0 @@
-// ignore-tidy-filelength
-
-//! Multi-producer, single-consumer FIFO queue communication primitives.
-//!
-//! This module provides message-based communication over channels, concretely
-//! defined among three types:
-//!
-//! * [`Sender`]
-//! * [`SyncSender`]
-//! * [`Receiver`]
-//!
-//! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
-//! senders are clone-able (multi-producer) such that many threads can send
-//! simultaneously to one receiver (single-consumer).
-//!
-//! These channels come in two flavors:
-//!
-//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
-//! will return a `(Sender, Receiver)` tuple where all sends will be
-//! **asynchronous** (they never block). The channel conceptually has an
-//! infinite buffer.
-//!
-//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
-//! return a `(SyncSender, Receiver)` tuple where the storage for pending
-//! messages is a pre-allocated buffer of a fixed size. All sends will be
-//! **synchronous** by blocking until there is buffer space available. Note
-//! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
-//! channel where each sender atomically hands off a message to a receiver.
-//!
-//! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
-//! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
-//! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
-//! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
-//! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
-//! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
-//!
-//! ## Disconnection
-//!
-//! The send and receive operations on channels will all return a [`Result`]
-//! indicating whether the operation succeeded or not. An unsuccessful operation
-//! is normally indicative of the other half of a channel having "hung up" by
-//! being dropped in its corresponding thread.
-//!
-//! Once half of a channel has been deallocated, most operations can no longer
-//! continue to make progress, so [`Err`] will be returned. Many applications
-//! will continue to [`unwrap`] the results returned from this module,
-//! instigating a propagation of failure among threads if one unexpectedly dies.
-//!
-//! [`Result`]: ../../../std/result/enum.Result.html
-//! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
-//! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
-//!
-//! # Examples
-//!
-//! Simple usage:
-//!
-//! ```
-//! use std::thread;
-//! use std::sync::mpsc::channel;
-//!
-//! // Create a simple streaming channel
-//! let (tx, rx) = channel();
-//! thread::spawn(move|| {
-//! tx.send(10).unwrap();
-//! });
-//! assert_eq!(rx.recv().unwrap(), 10);
-//! ```
-//!
-//! Shared usage:
-//!
-//! ```
-//! use std::thread;
-//! use std::sync::mpsc::channel;
-//!
-//! // Create a shared channel that can be sent along from many threads
-//! // where tx is the sending half (tx for transmission), and rx is the receiving
-//! // half (rx for receiving).
-//! let (tx, rx) = channel();
-//! for i in 0..10 {
-//! let tx = tx.clone();
-//! thread::spawn(move|| {
-//! tx.send(i).unwrap();
-//! });
-//! }
-//!
-//! for _ in 0..10 {
-//! let j = rx.recv().unwrap();
-//! assert!(0 <= j && j < 10);
-//! }
-//! ```
-//!
-//! Propagating panics:
-//!
-//! ```
-//! use std::sync::mpsc::channel;
-//!
-//! // The call to recv() will return an error because the channel has already
-//! // hung up (or been deallocated)
-//! let (tx, rx) = channel::<i32>();
-//! drop(tx);
-//! assert!(rx.recv().is_err());
-//! ```
-//!
-//! Synchronous channels:
-//!
-//! ```
-//! use std::thread;
-//! use std::sync::mpsc::sync_channel;
-//!
-//! let (tx, rx) = sync_channel::<i32>(0);
-//! thread::spawn(move|| {
-//! // This will wait for the parent thread to start receiving
-//! tx.send(53).unwrap();
-//! });
-//! rx.recv().unwrap();
-//! ```
-
-#![stable(feature = "rust1", since = "1.0.0")]
-
-// A description of how Rust's channel implementation works
-//
-// Channels are supposed to be the basic building block for all other
-// concurrent primitives that are used in Rust. As a result, the channel type
-// needs to be highly optimized, flexible, and broad enough for use everywhere.
-//
-// The choice of implementation of all channels is to be built on lock-free data
-// structures. The channels themselves are then consequently also lock-free data
-// structures. As always with lock-free code, this is a very "here be dragons"
-// territory, especially because I'm unaware of any academic papers that have
-// gone into great length about channels of these flavors.
-//
-// ## Flavors of channels
-//
-// From the perspective of a consumer of this library, there is only one flavor
-// of channel. This channel can be used as a stream and cloned to allow multiple
-// senders. Under the hood, however, there are actually three flavors of
-// channels in play.
-//
-// * Flavor::Oneshots - these channels are highly optimized for the one-send use
-// case. They contain as few atomics as possible and
-// involve one and exactly one allocation.
-// * Streams - these channels are optimized for the non-shared use case. They
-// use a different concurrent queue that is more tailored for this
-// use case. The initial allocation of this flavor of channel is not
-// optimized.
-// * Shared - this is the most general form of channel that this module offers,
-// a channel with multiple senders. This type is as optimized as it
-// can be, but the previous two types mentioned are much faster for
-// their use-cases.
-//
-// ## Concurrent queues
-//
-// The basic idea of Rust's Sender/Receiver types is that send() never blocks,
-// but recv() obviously blocks. This means that under the hood there must be
-// some shared and concurrent queue holding all of the actual data.
-//
-// With two flavors of channels, two flavors of queues are also used. We have
-// chosen to use queues from a well-known author that are abbreviated as SPSC
-// and MPSC (single producer, single consumer and multiple producer, single
-// consumer). SPSC queues are used for streams while MPSC queues are used for
-// shared channels.
-//
-// ### SPSC optimizations
-//
-// The SPSC queue found online is essentially a linked list of nodes where one
-// half of the nodes are the "queue of data" and the other half of nodes are a
-// cache of unused nodes. The unused nodes are used such that an allocation is
-// not required on every push() and a free doesn't need to happen on every
-// pop().
-//
-// As found online, however, the cache of nodes is of an infinite size. This
-// means that if a channel at one point in its life had 50k items in the queue,
-// then the queue will always have the capacity for 50k items. I believed that
-// this was an unnecessary limitation of the implementation, so I have altered
-// the queue to optionally have a bound on the cache size.
-//
-// By default, streams will have an unbounded SPSC queue with a small-ish cache
-// size. The hope is that the cache is still large enough to have very fast
-// send() operations while not too large such that millions of channels can
-// coexist at once.
-//
-// ### MPSC optimizations
-//
-// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
-// a linked list under the hood to earn its unboundedness, but I have not put
-// forth much effort into having a cache of nodes similar to the SPSC queue.
-//
-// For now, I believe that this is "ok" because shared channels are not the most
-// common type, but soon we may wish to revisit this queue choice and determine
-// another candidate for backend storage of shared channels.
-//
-// ## Overview of the Implementation
-//
-// Now that there's a little background on the concurrent queues used, it's
-// worth going into much more detail about the channels themselves. The basic
-// pseudocode for a send/recv are:
-//
-//
-// send(t) recv()
-// queue.push(t) return if queue.pop()
-// if increment() == -1 deschedule {
-// wakeup() if decrement() > 0
-// cancel_deschedule()
-// }
-// queue.pop()
-//
-// As mentioned before, there are no locks in this implementation, only atomic
-// instructions are used.
-//
-// ### The internal atomic counter
-//
-// Every channel has a shared counter with each half to keep track of the size
-// of the queue. This counter is used to abort descheduling by the receiver and
-// to know when to wake up on the sending side.
-//
-// As seen in the pseudocode, senders will increment this count and receivers
-// will decrement the count. The theory behind this is that if a sender sees a
-// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
-// then it doesn't need to block.
-//
-// The recv() method has a beginning call to pop(), and if successful, it needs
-// to decrement the count. It is a crucial implementation detail that this
-// decrement does *not* happen to the shared counter. If this were the case,
-// then it would be possible for the counter to be very negative when there were
-// no receivers waiting, in which case the senders would have to determine when
-// it was actually appropriate to wake up a receiver.
-//
-// Instead, the "steal count" is kept track of separately (not atomically
-// because it's only used by receivers), and then the decrement() call when
-// descheduling will lump in all of the recent steals into one large decrement.
-//
-// The implication of this is that if a sender sees a -1 count, then there's
-// guaranteed to be a waiter waiting!
-//
-// ## Native Implementation
-//
-// A major goal of these channels is to work seamlessly on and off the runtime.
-// All of the previous race conditions have been worded in terms of
-// scheduler-isms (which is obviously not available without the runtime).
-//
-// For now, native usage of channels (off the runtime) will fall back onto
-// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
-// is still entirely lock-free, the "deschedule" blocks above are surrounded by
-// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
-// condition variable.
-//
-// ## Select
-//
-// Being able to support selection over channels has greatly influenced this
-// design, and not only does selection need to work inside the runtime, but also
-// outside the runtime.
-//
-// The implementation is fairly straightforward. The goal of select() is not to
-// return some data, but only to return which channel can receive data without
-// blocking. The implementation is essentially the entire blocking procedure
-// followed by an increment as soon as its woken up. The cancellation procedure
-// involves an increment and swapping out of to_wake to acquire ownership of the
-// thread to unblock.
-//
-// Sadly this current implementation requires multiple allocations, so I have
-// seen the throughput of select() be much worse than it should be. I do not
-// believe that there is anything fundamental that needs to change about these
-// channels, however, in order to support a more efficient select().
-//
-// FIXME: Select is now removed, so these factors are ready to be cleaned up!
-//
-// # Conclusion
-//
-// And now that you've seen all the races that I found and attempted to fix,
-// here's the code for you to find some more!
-
-use crate::cell::UnsafeCell;
-use crate::error;
-use crate::fmt;
-use crate::mem;
-use crate::sync::Arc;
-use crate::time::{Duration, Instant};
-
-mod blocking;
-mod mpsc_queue;
-mod oneshot;
-mod shared;
-mod spsc_queue;
-mod stream;
-mod sync;
-
-mod cache_aligned;
-
-/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
-/// This half can only be owned by one thread.
-///
-/// Messages sent to the channel can be retrieved using [`recv`].
-///
-/// [`channel`]: fn.channel.html
-/// [`sync_channel`]: fn.sync_channel.html
-/// [`recv`]: struct.Receiver.html#method.recv
-///
-/// # Examples
-///
-/// ```rust
-/// use std::sync::mpsc::channel;
-/// use std::thread;
-/// use std::time::Duration;
-///
-/// let (send, recv) = channel();
-///
-/// thread::spawn(move || {
-/// send.send("Hello world!").unwrap();
-/// thread::sleep(Duration::from_secs(2)); // block for two seconds
-/// send.send("Delayed for 2 seconds").unwrap();
-/// });
-///
-/// println!("{}", recv.recv().unwrap()); // Received immediately
-/// println!("Waiting...");
-/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
-/// ```
-#[stable(feature = "rust1", since = "1.0.0")]
-pub struct Receiver<T> {
- inner: UnsafeCell<Flavor<T>>,
-}
-
-// The receiver port can be sent from place to place, so long as it
-// is not used to receive non-sendable things.
-#[stable(feature = "rust1", since = "1.0.0")]
-unsafe impl<T: Send> Send for Receiver<T> {}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> !Sync for Receiver<T> {}
-
-/// An iterator over messages on a [`Receiver`], created by [`iter`].
-///
-/// This iterator will block whenever [`next`] is called,
-/// waiting for a new message, and [`None`] will be returned
-/// when the corresponding channel has hung up.
-///
-/// [`iter`]: struct.Receiver.html#method.iter
-/// [`Receiver`]: struct.Receiver.html
-/// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
-/// [`None`]: ../../../std/option/enum.Option.html#variant.None
-///
-/// # Examples
-///
-/// ```rust
-/// use std::sync::mpsc::channel;
-/// use std::thread;
-///
-/// let (send, recv) = channel();
-///
-/// thread::spawn(move || {
-/// send.send(1u8).unwrap();
-/// send.send(2u8).unwrap();
-/// send.send(3u8).unwrap();
-/// });
-///
-/// for x in recv.iter() {
-/// println!("Got: {}", x);
-/// }
-/// ```
-#[stable(feature = "rust1", since = "1.0.0")]
-#[derive(Debug)]
-pub struct Iter<'a, T: 'a> {
- rx: &'a Receiver<T>,
-}
-
-/// An iterator that attempts to yield all pending values for a [`Receiver`],
-/// created by [`try_iter`].
-///
-/// [`None`] will be returned when there are no pending values remaining or
-/// if the corresponding channel has hung up.
-///
-/// This iterator will never block the caller in order to wait for data to
-/// become available. Instead, it will return [`None`].
-///
-/// [`Receiver`]: struct.Receiver.html
-/// [`try_iter`]: struct.Receiver.html#method.try_iter
-/// [`None`]: ../../../std/option/enum.Option.html#variant.None
-///
-/// # Examples
-///
-/// ```rust
-/// use std::sync::mpsc::channel;
-/// use std::thread;
-/// use std::time::Duration;
-///
-/// let (sender, receiver) = channel();
-///
-/// // Nothing is in the buffer yet
-/// assert!(receiver.try_iter().next().is_none());
-/// println!("Nothing in the buffer...");
-///
-/// thread::spawn(move || {
-/// sender.send(1).unwrap();
-/// sender.send(2).unwrap();
-/// sender.send(3).unwrap();
-/// });
-///
-/// println!("Going to sleep...");
-/// thread::sleep(Duration::from_secs(2)); // block for two seconds
-///
-/// for x in receiver.try_iter() {
-/// println!("Got: {}", x);
-/// }
-/// ```
-#[stable(feature = "receiver_try_iter", since = "1.15.0")]
-#[derive(Debug)]
-pub struct TryIter<'a, T: 'a> {
- rx: &'a Receiver<T>,
-}
-
-/// An owning iterator over messages on a [`Receiver`],
-/// created by **Receiver::into_iter**.
-///
-/// This iterator will block whenever [`next`]
-/// is called, waiting for a new message, and [`None`] will be
-/// returned if the corresponding channel has hung up.
-///
-/// [`Receiver`]: struct.Receiver.html
-/// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
-/// [`None`]: ../../../std/option/enum.Option.html#variant.None
-///
-/// # Examples
-///
-/// ```rust
-/// use std::sync::mpsc::channel;
-/// use std::thread;
-///
-/// let (send, recv) = channel();
-///
-/// thread::spawn(move || {
-/// send.send(1u8).unwrap();
-/// send.send(2u8).unwrap();
-/// send.send(3u8).unwrap();
-/// });
-///
-/// for x in recv.into_iter() {
-/// println!("Got: {}", x);
-/// }
-/// ```
-#[stable(feature = "receiver_into_iter", since = "1.1.0")]
-#[derive(Debug)]
-pub struct IntoIter<T> {
- rx: Receiver<T>,
-}
-
-/// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
-/// owned by one thread, but it can be cloned to send to other threads.
-///
-/// Messages can be sent through this channel with [`send`].
-///
-/// [`channel`]: fn.channel.html
-/// [`send`]: struct.Sender.html#method.send
-///
-/// # Examples
-///
-/// ```rust
-/// use std::sync::mpsc::channel;
-/// use std::thread;
-///
-/// let (sender, receiver) = channel();
-/// let sender2 = sender.clone();
-///
-/// // First thread owns sender
-/// thread::spawn(move || {
-/// sender.send(1).unwrap();
-/// });
-///
-/// // Second thread owns sender2
-/// thread::spawn(move || {
-/// sender2.send(2).unwrap();
-/// });
-///
-/// let msg = receiver.recv().unwrap();
-/// let msg2 = receiver.recv().unwrap();
-///
-/// assert_eq!(3, msg + msg2);
-/// ```
-#[stable(feature = "rust1", since = "1.0.0")]
-pub struct Sender<T> {
- inner: UnsafeCell<Flavor<T>>,
-}
-
-// The send port can be sent from place to place, so long as it
-// is not used to send non-sendable things.
-#[stable(feature = "rust1", since = "1.0.0")]
-unsafe impl<T: Send> Send for Sender<T> {}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> !Sync for Sender<T> {}
-
-/// The sending-half of Rust's synchronous [`sync_channel`] type.
-///
-/// Messages can be sent through this channel with [`send`] or [`try_send`].
-///
-/// [`send`] will block if there is no space in the internal buffer.
-///
-/// [`sync_channel`]: fn.sync_channel.html
-/// [`send`]: struct.SyncSender.html#method.send
-/// [`try_send`]: struct.SyncSender.html#method.try_send
-///
-/// # Examples
-///
-/// ```rust
-/// use std::sync::mpsc::sync_channel;
-/// use std::thread;
-///
-/// // Create a sync_channel with buffer size 2
-/// let (sync_sender, receiver) = sync_channel(2);
-/// let sync_sender2 = sync_sender.clone();
-///
-/// // First thread owns sync_sender
-/// thread::spawn(move || {
-/// sync_sender.send(1).unwrap();
-/// sync_sender.send(2).unwrap();
-/// });
-///
-/// // Second thread owns sync_sender2
-/// thread::spawn(move || {
-/// sync_sender2.send(3).unwrap();
-/// // thread will now block since the buffer is full
-/// println!("Thread unblocked!");
-/// });
-///
-/// let mut msg;
-///
-/// msg = receiver.recv().unwrap();
-/// println!("message {} received", msg);
-///
-/// // "Thread unblocked!" will be printed now
-///
-/// msg = receiver.recv().unwrap();
-/// println!("message {} received", msg);
-///
-/// msg = receiver.recv().unwrap();
-///
-/// println!("message {} received", msg);
-/// ```
-#[stable(feature = "rust1", since = "1.0.0")]
-pub struct SyncSender<T> {
- inner: Arc<sync::Packet<T>>,
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-unsafe impl<T: Send> Send for SyncSender<T> {}
-
-/// An error returned from the [`Sender::send`] or [`SyncSender::send`]
-/// function on **channel**s.
-///
-/// A **send** operation can only fail if the receiving end of a channel is
-/// disconnected, implying that the data could never be received. The error
-/// contains the data being sent as a payload so it can be recovered.
-///
-/// [`Sender::send`]: struct.Sender.html#method.send
-/// [`SyncSender::send`]: struct.SyncSender.html#method.send
-#[stable(feature = "rust1", since = "1.0.0")]
-#[derive(PartialEq, Eq, Clone, Copy)]
-pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
-
-/// An error returned from the [`recv`] function on a [`Receiver`].
-///
-/// The [`recv`] operation can only fail if the sending half of a
-/// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further
-/// messages will ever be received.
-///
-/// [`recv`]: struct.Receiver.html#method.recv
-/// [`Receiver`]: struct.Receiver.html
-/// [`channel`]: fn.channel.html
-/// [`sync_channel`]: fn.sync_channel.html
-#[derive(PartialEq, Eq, Clone, Copy, Debug)]
-#[stable(feature = "rust1", since = "1.0.0")]
-pub struct RecvError;
-
-/// This enumeration is the list of the possible reasons that [`try_recv`] could
-/// not return data when called. This can occur with both a [`channel`] and
-/// a [`sync_channel`].
-///
-/// [`try_recv`]: struct.Receiver.html#method.try_recv
-/// [`channel`]: fn.channel.html
-/// [`sync_channel`]: fn.sync_channel.html
-#[derive(PartialEq, Eq, Clone, Copy, Debug)]
-#[stable(feature = "rust1", since = "1.0.0")]
-pub enum TryRecvError {
- /// This **channel** is currently empty, but the **Sender**(s) have not yet
- /// disconnected, so data may yet become available.
- #[stable(feature = "rust1", since = "1.0.0")]
- Empty,
-
- /// The **channel**'s sending half has become disconnected, and there will
- /// never be any more data received on it.
- #[stable(feature = "rust1", since = "1.0.0")]
- Disconnected,
-}
-
-/// This enumeration is the list of possible errors that made [`recv_timeout`]
-/// unable to return data when called. This can occur with both a [`channel`] and
-/// a [`sync_channel`].
-///
-/// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
-/// [`channel`]: fn.channel.html
-/// [`sync_channel`]: fn.sync_channel.html
-#[derive(PartialEq, Eq, Clone, Copy, Debug)]
-#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
-pub enum RecvTimeoutError {
- /// This **channel** is currently empty, but the **Sender**(s) have not yet
- /// disconnected, so data may yet become available.
- #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
- Timeout,
- /// The **channel**'s sending half has become disconnected, and there will
- /// never be any more data received on it.
- #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
- Disconnected,
-}
-
-/// This enumeration is the list of the possible error outcomes for the
-/// [`try_send`] method.
-///
-/// [`try_send`]: struct.SyncSender.html#method.try_send
-#[stable(feature = "rust1", since = "1.0.0")]
-#[derive(PartialEq, Eq, Clone, Copy)]
-pub enum TrySendError<T> {
- /// The data could not be sent on the [`sync_channel`] because it would require that
- /// the callee block to send the data.
- ///
- /// If this is a buffered channel, then the buffer is full at this time. If
- /// this is not a buffered channel, then there is no [`Receiver`] available to
- /// acquire the data.
- ///
- /// [`sync_channel`]: fn.sync_channel.html
- /// [`Receiver`]: struct.Receiver.html
- #[stable(feature = "rust1", since = "1.0.0")]
- Full(#[stable(feature = "rust1", since = "1.0.0")] T),
-
- /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
- /// sent. The data is returned back to the callee in this case.
- ///
- /// [`sync_channel`]: fn.sync_channel.html
- #[stable(feature = "rust1", since = "1.0.0")]
- Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
-}
-
-enum Flavor<T> {
- Oneshot(Arc<oneshot::Packet<T>>),
- Stream(Arc<stream::Packet<T>>),
- Shared(Arc<shared::Packet<T>>),
- Sync(Arc<sync::Packet<T>>),
-}
-
-#[doc(hidden)]
-trait UnsafeFlavor<T> {
- fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
- unsafe fn inner_mut(&self) -> &mut Flavor<T> {
- &mut *self.inner_unsafe().get()
- }
- unsafe fn inner(&self) -> &Flavor<T> {
- &*self.inner_unsafe().get()
- }
-}
-impl<T> UnsafeFlavor<T> for Sender<T> {
- fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
- &self.inner
- }
-}
-impl<T> UnsafeFlavor<T> for Receiver<T> {
- fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
- &self.inner
- }
-}
-
-/// Creates a new asynchronous channel, returning the sender/receiver halves.
-/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
-/// the same order as it was sent, and no [`send`] will block the calling thread
-/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
-/// block after its buffer limit is reached). [`recv`] will block until a message
-/// is available.
-///
-/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
-/// only one [`Receiver`] is supported.
-///
-/// If the [`Receiver`] is disconnected while trying to [`send`] with the
-/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
-/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
-/// return a [`RecvError`].
-///
-/// [`send`]: struct.Sender.html#method.send
-/// [`recv`]: struct.Receiver.html#method.recv
-/// [`Sender`]: struct.Sender.html
-/// [`Receiver`]: struct.Receiver.html
-/// [`sync_channel`]: fn.sync_channel.html
-/// [`SendError`]: struct.SendError.html
-/// [`RecvError`]: struct.RecvError.html
-///
-/// # Examples
-///
-/// ```
-/// use std::sync::mpsc::channel;
-/// use std::thread;
-///
-/// let (sender, receiver) = channel();
-///
-/// // Spawn off an expensive computation
-/// thread::spawn(move|| {
-/// # fn expensive_computation() {}
-/// sender.send(expensive_computation()).unwrap();
-/// });
-///
-/// // Do some useful work for awhile
-///
-/// // Let's see what that answer was
-/// println!("{:?}", receiver.recv().unwrap());
-/// ```
-#[stable(feature = "rust1", since = "1.0.0")]
-pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
- let a = Arc::new(oneshot::Packet::new());
- (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
-}
-
-/// Creates a new synchronous, bounded channel.
-/// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
-/// in the same order as it was sent. Like asynchronous [`channel`]s, the
-/// [`Receiver`] will block until a message becomes available. `sync_channel`
-/// differs greatly in the semantics of the sender, however.
-///
-/// This channel has an internal buffer on which messages will be queued.
-/// `bound` specifies the buffer size. When the internal buffer becomes full,
-/// future sends will *block* waiting for the buffer to open up. Note that a
-/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
-/// where each [`send`] will not return until a [`recv`] is paired with it.
-///
-/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
-/// times, but only one [`Receiver`] is supported.
-///
-/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
-/// to [`send`] with the [`SyncSender`], the [`send`] method will return a
-/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
-/// to [`recv`], the [`recv`] method will return a [`RecvError`].
-///
-/// [`channel`]: fn.channel.html
-/// [`send`]: struct.SyncSender.html#method.send
-/// [`recv`]: struct.Receiver.html#method.recv
-/// [`SyncSender`]: struct.SyncSender.html
-/// [`Receiver`]: struct.Receiver.html
-/// [`SendError`]: struct.SendError.html
-/// [`RecvError`]: struct.RecvError.html
-///
-/// # Examples
-///
-/// ```
-/// use std::sync::mpsc::sync_channel;
-/// use std::thread;
-///
-/// let (sender, receiver) = sync_channel(1);
-///
-/// // this returns immediately
-/// sender.send(1).unwrap();
-///
-/// thread::spawn(move|| {
-/// // this will block until the previous message has been received
-/// sender.send(2).unwrap();
-/// });
-///
-/// assert_eq!(receiver.recv().unwrap(), 1);
-/// assert_eq!(receiver.recv().unwrap(), 2);
-/// ```
-#[stable(feature = "rust1", since = "1.0.0")]
-pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
- let a = Arc::new(sync::Packet::new(bound));
- (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Sender
-////////////////////////////////////////////////////////////////////////////////
-
-impl<T> Sender<T> {
- fn new(inner: Flavor<T>) -> Sender<T> {
- Sender { inner: UnsafeCell::new(inner) }
- }
-
- /// Attempts to send a value on this channel, returning it back if it could
- /// not be sent.
- ///
- /// A successful send occurs when it is determined that the other end of
- /// the channel has not hung up already. An unsuccessful send would be one
- /// where the corresponding receiver has already been deallocated. Note
- /// that a return value of [`Err`] means that the data will never be
- /// received, but a return value of [`Ok`] does *not* mean that the data
- /// will be received. It is possible for the corresponding receiver to
- /// hang up immediately after this function returns [`Ok`].
- ///
- /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
- /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
- ///
- /// This method will never block the current thread.
- ///
- /// # Examples
- ///
- /// ```
- /// use std::sync::mpsc::channel;
- ///
- /// let (tx, rx) = channel();
- ///
- /// // This send is always successful
- /// tx.send(1).unwrap();
- ///
- /// // This send will fail because the receiver is gone
- /// drop(rx);
- /// assert_eq!(tx.send(1).unwrap_err().0, 1);
- /// ```
- #[stable(feature = "rust1", since = "1.0.0")]
- pub fn send(&self, t: T) -> Result<(), SendError<T>> {
- let (new_inner, ret) = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => {
- if !p.sent() {
- return p.send(t).map_err(SendError);
- } else {
- let a = Arc::new(stream::Packet::new());
- let rx = Receiver::new(Flavor::Stream(a.clone()));
- match p.upgrade(rx) {
- oneshot::UpSuccess => {
- let ret = a.send(t);
- (a, ret)
- }
- oneshot::UpDisconnected => (a, Err(t)),
- oneshot::UpWoke(token) => {
- // This send cannot panic because the thread is
- // asleep (we're looking at it), so the receiver
- // can't go away.
- a.send(t).ok().unwrap();
- token.signal();
- (a, Ok(()))
- }
- }
- }
- }
- Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
- Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
- Flavor::Sync(..) => unreachable!(),
- };
-
- unsafe {
- let tmp = Sender::new(Flavor::Stream(new_inner));
- mem::swap(self.inner_mut(), tmp.inner_mut());
- }
- ret.map_err(SendError)
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> Clone for Sender<T> {
- fn clone(&self) -> Sender<T> {
- let packet = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => {
- let a = Arc::new(shared::Packet::new());
- {
- let guard = a.postinit_lock();
- let rx = Receiver::new(Flavor::Shared(a.clone()));
- let sleeper = match p.upgrade(rx) {
- oneshot::UpSuccess | oneshot::UpDisconnected => None,
- oneshot::UpWoke(task) => Some(task),
- };
- a.inherit_blocker(sleeper, guard);
- }
- a
- }
- Flavor::Stream(ref p) => {
- let a = Arc::new(shared::Packet::new());
- {
- let guard = a.postinit_lock();
- let rx = Receiver::new(Flavor::Shared(a.clone()));
- let sleeper = match p.upgrade(rx) {
- stream::UpSuccess | stream::UpDisconnected => None,
- stream::UpWoke(task) => Some(task),
- };
- a.inherit_blocker(sleeper, guard);
- }
- a
- }
- Flavor::Shared(ref p) => {
- p.clone_chan();
- return Sender::new(Flavor::Shared(p.clone()));
- }
- Flavor::Sync(..) => unreachable!(),
- };
-
- unsafe {
- let tmp = Sender::new(Flavor::Shared(packet.clone()));
- mem::swap(self.inner_mut(), tmp.inner_mut());
- }
- Sender::new(Flavor::Shared(packet))
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> Drop for Sender<T> {
- fn drop(&mut self) {
- match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => p.drop_chan(),
- Flavor::Stream(ref p) => p.drop_chan(),
- Flavor::Shared(ref p) => p.drop_chan(),
- Flavor::Sync(..) => unreachable!(),
- }
- }
-}
-
-#[stable(feature = "mpsc_debug", since = "1.8.0")]
-impl<T> fmt::Debug for Sender<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Sender").finish()
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// SyncSender
-////////////////////////////////////////////////////////////////////////////////
-
-impl<T> SyncSender<T> {
- fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
- SyncSender { inner }
- }
-
- /// Sends a value on this synchronous channel.
- ///
- /// This function will *block* until space in the internal buffer becomes
- /// available or a receiver is available to hand off the message to.
- ///
- /// Note that a successful send does *not* guarantee that the receiver will
- /// ever see the data if there is a buffer on this channel. Items may be
- /// enqueued in the internal buffer for the receiver to receive at a later
- /// time. If the buffer size is 0, however, the channel becomes a rendezvous
- /// channel and it guarantees that the receiver has indeed received
- /// the data if this function returns success.
- ///
- /// This function will never panic, but it may return [`Err`] if the
- /// [`Receiver`] has disconnected and is no longer able to receive
- /// information.
- ///
- /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
- /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
- ///
- /// # Examples
- ///
- /// ```rust
- /// use std::sync::mpsc::sync_channel;
- /// use std::thread;
- ///
- /// // Create a rendezvous sync_channel with buffer size 0
- /// let (sync_sender, receiver) = sync_channel(0);
- ///
- /// thread::spawn(move || {
- /// println!("sending message...");
- /// sync_sender.send(1).unwrap();
- /// // Thread is now blocked until the message is received
- ///
- /// println!("...message received!");
- /// });
- ///
- /// let msg = receiver.recv().unwrap();
- /// assert_eq!(1, msg);
- /// ```
- #[stable(feature = "rust1", since = "1.0.0")]
- pub fn send(&self, t: T) -> Result<(), SendError<T>> {
- self.inner.send(t).map_err(SendError)
- }
-
- /// Attempts to send a value on this channel without blocking.
- ///
- /// This method differs from [`send`] by returning immediately if the
- /// channel's buffer is full or no receiver is waiting to acquire some
- /// data. Compared with [`send`], this function has two failure cases
- /// instead of one (one for disconnection, one for a full buffer).
- ///
- /// See [`send`] for notes about guarantees of whether the
- /// receiver has received the data or not if this function is successful.
- ///
- /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
- ///
- /// # Examples
- ///
- /// ```rust
- /// use std::sync::mpsc::sync_channel;
- /// use std::thread;
- ///
- /// // Create a sync_channel with buffer size 1
- /// let (sync_sender, receiver) = sync_channel(1);
- /// let sync_sender2 = sync_sender.clone();
- ///
- /// // First thread owns sync_sender
- /// thread::spawn(move || {
- /// sync_sender.send(1).unwrap();
- /// sync_sender.send(2).unwrap();
- /// // Thread blocked
- /// });
- ///
- /// // Second thread owns sync_sender2
- /// thread::spawn(move || {
- /// // This will return an error and send
- /// // no message if the buffer is full
- /// let _ = sync_sender2.try_send(3);
- /// });
- ///
- /// let mut msg;
- /// msg = receiver.recv().unwrap();
- /// println!("message {} received", msg);
- ///
- /// msg = receiver.recv().unwrap();
- /// println!("message {} received", msg);
- ///
- /// // Third message may have never been sent
- /// match receiver.try_recv() {
- /// Ok(msg) => println!("message {} received", msg),
- /// Err(_) => println!("the third message was never sent"),
- /// }
- /// ```
- #[stable(feature = "rust1", since = "1.0.0")]
- pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
- self.inner.try_send(t)
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> Clone for SyncSender<T> {
- fn clone(&self) -> SyncSender<T> {
- self.inner.clone_chan();
- SyncSender::new(self.inner.clone())
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> Drop for SyncSender<T> {
- fn drop(&mut self) {
- self.inner.drop_chan();
- }
-}
-
-#[stable(feature = "mpsc_debug", since = "1.8.0")]
-impl<T> fmt::Debug for SyncSender<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("SyncSender").finish()
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Receiver
-////////////////////////////////////////////////////////////////////////////////
-
-impl<T> Receiver<T> {
- fn new(inner: Flavor<T>) -> Receiver<T> {
- Receiver { inner: UnsafeCell::new(inner) }
- }
-
- /// Attempts to return a pending value on this receiver without blocking.
- ///
- /// This method will never block the caller in order to wait for data to
- /// become available. Instead, this will always return immediately with a
- /// possible option of pending data on the channel.
- ///
- /// This is useful for a flavor of "optimistic check" before deciding to
- /// block on a receiver.
- ///
- /// Compared with [`recv`], this function has two failure cases instead of one
- /// (one for disconnection, one for an empty buffer).
- ///
- /// [`recv`]: struct.Receiver.html#method.recv
- ///
- /// # Examples
- ///
- /// ```rust
- /// use std::sync::mpsc::{Receiver, channel};
- ///
- /// let (_, receiver): (_, Receiver<i32>) = channel();
- ///
- /// assert!(receiver.try_recv().is_err());
- /// ```
- #[stable(feature = "rust1", since = "1.0.0")]
- pub fn try_recv(&self) -> Result<T, TryRecvError> {
- loop {
- let new_port = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(oneshot::Empty) => return Err(TryRecvError::Empty),
- Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
- Err(oneshot::Upgraded(rx)) => rx,
- },
- Flavor::Stream(ref p) => match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(stream::Empty) => return Err(TryRecvError::Empty),
- Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
- Err(stream::Upgraded(rx)) => rx,
- },
- Flavor::Shared(ref p) => match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(shared::Empty) => return Err(TryRecvError::Empty),
- Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
- },
- Flavor::Sync(ref p) => match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(sync::Empty) => return Err(TryRecvError::Empty),
- Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
- },
- };
- unsafe {
- mem::swap(self.inner_mut(), new_port.inner_mut());
- }
- }
- }
-
- /// Attempts to wait for a value on this receiver, returning an error if the
- /// corresponding channel has hung up.
- ///
- /// This function will always block the current thread if there is no data
- /// available and it's possible for more data to be sent. Once a message is
- /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
- /// receiver will wake up and return that message.
- ///
- /// If the corresponding [`Sender`] has disconnected, or it disconnects while
- /// this call is blocking, this call will wake up and return [`Err`] to
- /// indicate that no more messages can ever be received on this channel.
- /// However, since channels are buffered, messages sent before the disconnect
- /// will still be properly received.
- ///
- /// [`Sender`]: struct.Sender.html
- /// [`SyncSender`]: struct.SyncSender.html
- /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
- ///
- /// # Examples
- ///
- /// ```
- /// use std::sync::mpsc;
- /// use std::thread;
- ///
- /// let (send, recv) = mpsc::channel();
- /// let handle = thread::spawn(move || {
- /// send.send(1u8).unwrap();
- /// });
- ///
- /// handle.join().unwrap();
- ///
- /// assert_eq!(Ok(1), recv.recv());
- /// ```
- ///
- /// Buffering behavior:
- ///
- /// ```
- /// use std::sync::mpsc;
- /// use std::thread;
- /// use std::sync::mpsc::RecvError;
- ///
- /// let (send, recv) = mpsc::channel();
- /// let handle = thread::spawn(move || {
- /// send.send(1u8).unwrap();
- /// send.send(2).unwrap();
- /// send.send(3).unwrap();
- /// drop(send);
- /// });
- ///
- /// // wait for the thread to join so we ensure the sender is dropped
- /// handle.join().unwrap();
- ///
- /// assert_eq!(Ok(1), recv.recv());
- /// assert_eq!(Ok(2), recv.recv());
- /// assert_eq!(Ok(3), recv.recv());
- /// assert_eq!(Err(RecvError), recv.recv());
- /// ```
- #[stable(feature = "rust1", since = "1.0.0")]
- pub fn recv(&self) -> Result<T, RecvError> {
- loop {
- let new_port = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => match p.recv(None) {
- Ok(t) => return Ok(t),
- Err(oneshot::Disconnected) => return Err(RecvError),
- Err(oneshot::Upgraded(rx)) => rx,
- Err(oneshot::Empty) => unreachable!(),
- },
- Flavor::Stream(ref p) => match p.recv(None) {
- Ok(t) => return Ok(t),
- Err(stream::Disconnected) => return Err(RecvError),
- Err(stream::Upgraded(rx)) => rx,
- Err(stream::Empty) => unreachable!(),
- },
- Flavor::Shared(ref p) => match p.recv(None) {
- Ok(t) => return Ok(t),
- Err(shared::Disconnected) => return Err(RecvError),
- Err(shared::Empty) => unreachable!(),
- },
- Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
- };
- unsafe {
- mem::swap(self.inner_mut(), new_port.inner_mut());
- }
- }
- }
-
- /// Attempts to wait for a value on this receiver, returning an error if the
- /// corresponding channel has hung up, or if it waits more than `timeout`.
- ///
- /// This function will always block the current thread if there is no data
- /// available and it's possible for more data to be sent. Once a message is
- /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
- /// receiver will wake up and return that message.
- ///
- /// If the corresponding [`Sender`] has disconnected, or it disconnects while
- /// this call is blocking, this call will wake up and return [`Err`] to
- /// indicate that no more messages can ever be received on this channel.
- /// However, since channels are buffered, messages sent before the disconnect
- /// will still be properly received.
- ///
- /// [`Sender`]: struct.Sender.html
- /// [`SyncSender`]: struct.SyncSender.html
- /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
- ///
- /// # Known Issues
- ///
- /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
- /// to panic unexpectedly with the following example:
- ///
- /// ```no_run
- /// use std::sync::mpsc::channel;
- /// use std::thread;
- /// use std::time::Duration;
- ///
- /// let (tx, rx) = channel::<String>();
- ///
- /// thread::spawn(move || {
- /// let d = Duration::from_millis(10);
- /// loop {
- /// println!("recv");
- /// let _r = rx.recv_timeout(d);
- /// }
- /// });
- ///
- /// thread::sleep(Duration::from_millis(100));
- /// let _c1 = tx.clone();
- ///
- /// thread::sleep(Duration::from_secs(1));
- /// ```
- ///
- /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
- ///
- /// # Examples
- ///
- /// Successfully receiving value before encountering timeout:
- ///
- /// ```no_run
- /// use std::thread;
- /// use std::time::Duration;
- /// use std::sync::mpsc;
- ///
- /// let (send, recv) = mpsc::channel();
- ///
- /// thread::spawn(move || {
- /// send.send('a').unwrap();
- /// });
- ///
- /// assert_eq!(
- /// recv.recv_timeout(Duration::from_millis(400)),
- /// Ok('a')
- /// );
- /// ```
- ///
- /// Receiving an error upon reaching timeout:
- ///
- /// ```no_run
- /// use std::thread;
- /// use std::time::Duration;
- /// use std::sync::mpsc;
- ///
- /// let (send, recv) = mpsc::channel();
- ///
- /// thread::spawn(move || {
- /// thread::sleep(Duration::from_millis(800));
- /// send.send('a').unwrap();
- /// });
- ///
- /// assert_eq!(
- /// recv.recv_timeout(Duration::from_millis(400)),
- /// Err(mpsc::RecvTimeoutError::Timeout)
- /// );
- /// ```
- #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
- pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
- // Do an optimistic try_recv to avoid the performance impact of
- // Instant::now() in the full-channel case.
- match self.try_recv() {
- Ok(result) => Ok(result),
- Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
- Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
- Some(deadline) => self.recv_deadline(deadline),
- // So far in the future that it's practically the same as waiting indefinitely.
- None => self.recv().map_err(RecvTimeoutError::from),
- },
- }
- }
-
- /// Attempts to wait for a value on this receiver, returning an error if the
- /// corresponding channel has hung up, or if `deadline` is reached.
- ///
- /// This function will always block the current thread if there is no data
- /// available and it's possible for more data to be sent. Once a message is
- /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
- /// receiver will wake up and return that message.
- ///
- /// If the corresponding [`Sender`] has disconnected, or it disconnects while
- /// this call is blocking, this call will wake up and return [`Err`] to
- /// indicate that no more messages can ever be received on this channel.
- /// However, since channels are buffered, messages sent before the disconnect
- /// will still be properly received.
- ///
- /// [`Sender`]: struct.Sender.html
- /// [`SyncSender`]: struct.SyncSender.html
- /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
- ///
- /// # Examples
- ///
- /// Successfully receiving value before reaching deadline:
- ///
- /// ```no_run
- /// #![feature(deadline_api)]
- /// use std::thread;
- /// use std::time::{Duration, Instant};
- /// use std::sync::mpsc;
- ///
- /// let (send, recv) = mpsc::channel();
- ///
- /// thread::spawn(move || {
- /// send.send('a').unwrap();
- /// });
- ///
- /// assert_eq!(
- /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
- /// Ok('a')
- /// );
- /// ```
- ///
- /// Receiving an error upon reaching deadline:
- ///
- /// ```no_run
- /// #![feature(deadline_api)]
- /// use std::thread;
- /// use std::time::{Duration, Instant};
- /// use std::sync::mpsc;
- ///
- /// let (send, recv) = mpsc::channel();
- ///
- /// thread::spawn(move || {
- /// thread::sleep(Duration::from_millis(800));
- /// send.send('a').unwrap();
- /// });
- ///
- /// assert_eq!(
- /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
- /// Err(mpsc::RecvTimeoutError::Timeout)
- /// );
- /// ```
- #[unstable(feature = "deadline_api", issue = "46316")]
- pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
- use self::RecvTimeoutError::*;
-
- loop {
- let port_or_empty = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(oneshot::Disconnected) => return Err(Disconnected),
- Err(oneshot::Upgraded(rx)) => Some(rx),
- Err(oneshot::Empty) => None,
- },
- Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(stream::Disconnected) => return Err(Disconnected),
- Err(stream::Upgraded(rx)) => Some(rx),
- Err(stream::Empty) => None,
- },
- Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(shared::Disconnected) => return Err(Disconnected),
- Err(shared::Empty) => None,
- },
- Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(sync::Disconnected) => return Err(Disconnected),
- Err(sync::Empty) => None,
- },
- };
-
- if let Some(new_port) = port_or_empty {
- unsafe {
- mem::swap(self.inner_mut(), new_port.inner_mut());
- }
- }
-
- // If we're already passed the deadline, and we're here without
- // data, return a timeout, else try again.
- if Instant::now() >= deadline {
- return Err(Timeout);
- }
- }
- }
-
- /// Returns an iterator that will block waiting for messages, but never
- /// [`panic!`]. It will return [`None`] when the channel has hung up.
- ///
- /// [`panic!`]: ../../../std/macro.panic.html
- /// [`None`]: ../../../std/option/enum.Option.html#variant.None
- ///
- /// # Examples
- ///
- /// ```rust
- /// use std::sync::mpsc::channel;
- /// use std::thread;
- ///
- /// let (send, recv) = channel();
- ///
- /// thread::spawn(move || {
- /// send.send(1).unwrap();
- /// send.send(2).unwrap();
- /// send.send(3).unwrap();
- /// });
- ///
- /// let mut iter = recv.iter();
- /// assert_eq!(iter.next(), Some(1));
- /// assert_eq!(iter.next(), Some(2));
- /// assert_eq!(iter.next(), Some(3));
- /// assert_eq!(iter.next(), None);
- /// ```
- #[stable(feature = "rust1", since = "1.0.0")]
- pub fn iter(&self) -> Iter<'_, T> {
- Iter { rx: self }
- }
-
- /// Returns an iterator that will attempt to yield all pending values.
- /// It will return `None` if there are no more pending values or if the
- /// channel has hung up. The iterator will never [`panic!`] or block the
- /// user by waiting for values.
- ///
- /// [`panic!`]: ../../../std/macro.panic.html
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use std::sync::mpsc::channel;
- /// use std::thread;
- /// use std::time::Duration;
- ///
- /// let (sender, receiver) = channel();
- ///
- /// // nothing is in the buffer yet
- /// assert!(receiver.try_iter().next().is_none());
- ///
- /// thread::spawn(move || {
- /// thread::sleep(Duration::from_secs(1));
- /// sender.send(1).unwrap();
- /// sender.send(2).unwrap();
- /// sender.send(3).unwrap();
- /// });
- ///
- /// // nothing is in the buffer yet
- /// assert!(receiver.try_iter().next().is_none());
- ///
- /// // block for two seconds
- /// thread::sleep(Duration::from_secs(2));
- ///
- /// let mut iter = receiver.try_iter();
- /// assert_eq!(iter.next(), Some(1));
- /// assert_eq!(iter.next(), Some(2));
- /// assert_eq!(iter.next(), Some(3));
- /// assert_eq!(iter.next(), None);
- /// ```
- #[stable(feature = "receiver_try_iter", since = "1.15.0")]
- pub fn try_iter(&self) -> TryIter<'_, T> {
- TryIter { rx: self }
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<'a, T> Iterator for Iter<'a, T> {
- type Item = T;
-
- fn next(&mut self) -> Option<T> {
- self.rx.recv().ok()
- }
-}
-
-#[stable(feature = "receiver_try_iter", since = "1.15.0")]
-impl<'a, T> Iterator for TryIter<'a, T> {
- type Item = T;
-
- fn next(&mut self) -> Option<T> {
- self.rx.try_recv().ok()
- }
-}
-
-#[stable(feature = "receiver_into_iter", since = "1.1.0")]
-impl<'a, T> IntoIterator for &'a Receiver<T> {
- type Item = T;
- type IntoIter = Iter<'a, T>;
-
- fn into_iter(self) -> Iter<'a, T> {
- self.iter()
- }
-}
-
-#[stable(feature = "receiver_into_iter", since = "1.1.0")]
-impl<T> Iterator for IntoIter<T> {
- type Item = T;
- fn next(&mut self) -> Option<T> {
- self.rx.recv().ok()
- }
-}
-
-#[stable(feature = "receiver_into_iter", since = "1.1.0")]
-impl<T> IntoIterator for Receiver<T> {
- type Item = T;
- type IntoIter = IntoIter<T>;
-
- fn into_iter(self) -> IntoIter<T> {
- IntoIter { rx: self }
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> Drop for Receiver<T> {
- fn drop(&mut self) {
- match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => p.drop_port(),
- Flavor::Stream(ref p) => p.drop_port(),
- Flavor::Shared(ref p) => p.drop_port(),
- Flavor::Sync(ref p) => p.drop_port(),
- }
- }
-}
-
-#[stable(feature = "mpsc_debug", since = "1.8.0")]
-impl<T> fmt::Debug for Receiver<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Receiver").finish()
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> fmt::Debug for SendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- "SendError(..)".fmt(f)
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> fmt::Display for SendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- "sending on a closed channel".fmt(f)
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T: Send> error::Error for SendError<T> {
- #[allow(deprecated)]
- fn description(&self) -> &str {
- "sending on a closed channel"
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> fmt::Debug for TrySendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match *self {
- TrySendError::Full(..) => "Full(..)".fmt(f),
- TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
- }
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> fmt::Display for TrySendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match *self {
- TrySendError::Full(..) => "sending on a full channel".fmt(f),
- TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
- }
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T: Send> error::Error for TrySendError<T> {
- #[allow(deprecated)]
- fn description(&self) -> &str {
- match *self {
- TrySendError::Full(..) => "sending on a full channel",
- TrySendError::Disconnected(..) => "sending on a closed channel",
- }
- }
-}
-
-#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
-impl<T> From<SendError<T>> for TrySendError<T> {
- fn from(err: SendError<T>) -> TrySendError<T> {
- match err {
- SendError(t) => TrySendError::Disconnected(t),
- }
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl fmt::Display for RecvError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- "receiving on a closed channel".fmt(f)
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl error::Error for RecvError {
- #[allow(deprecated)]
- fn description(&self) -> &str {
- "receiving on a closed channel"
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl fmt::Display for TryRecvError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match *self {
- TryRecvError::Empty => "receiving on an empty channel".fmt(f),
- TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
- }
- }
-}
-
-#[stable(feature = "rust1", since = "1.0.0")]
-impl error::Error for TryRecvError {
- #[allow(deprecated)]
- fn description(&self) -> &str {
- match *self {
- TryRecvError::Empty => "receiving on an empty channel",
- TryRecvError::Disconnected => "receiving on a closed channel",
- }
- }
-}
-
-#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
-impl From<RecvError> for TryRecvError {
- fn from(err: RecvError) -> TryRecvError {
- match err {
- RecvError => TryRecvError::Disconnected,
- }
- }
-}
-
-#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
-impl fmt::Display for RecvTimeoutError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match *self {
- RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
- RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
- }
- }
-}
-
-#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
-impl error::Error for RecvTimeoutError {
- #[allow(deprecated)]
- fn description(&self) -> &str {
- match *self {
- RecvTimeoutError::Timeout => "timed out waiting on channel",
- RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
- }
- }
-}
-
-#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
-impl From<RecvError> for RecvTimeoutError {
- fn from(err: RecvError) -> RecvTimeoutError {
- match err {
- RecvError => RecvTimeoutError::Disconnected,
- }
- }
-}
-
-#[cfg(all(test, not(target_os = "emscripten")))]
-mod tests {
- use super::*;
- use crate::env;
- use crate::thread;
- use crate::time::{Duration, Instant};
-
- pub fn stress_factor() -> usize {
- match env::var("RUST_TEST_STRESS") {
- Ok(val) => val.parse().unwrap(),
- Err(..) => 1,
- }
- }
-
- #[test]
- fn smoke() {
- let (tx, rx) = channel::<i32>();
- tx.send(1).unwrap();
- assert_eq!(rx.recv().unwrap(), 1);
- }
-
- #[test]
- fn drop_full() {
- let (tx, _rx) = channel::<Box<isize>>();
- tx.send(box 1).unwrap();
- }
-
- #[test]
- fn drop_full_shared() {
- let (tx, _rx) = channel::<Box<isize>>();
- drop(tx.clone());
- drop(tx.clone());
- tx.send(box 1).unwrap();
- }
-
- #[test]
- fn smoke_shared() {
- let (tx, rx) = channel::<i32>();
- tx.send(1).unwrap();
- assert_eq!(rx.recv().unwrap(), 1);
- let tx = tx.clone();
- tx.send(1).unwrap();
- assert_eq!(rx.recv().unwrap(), 1);
- }
-
- #[test]
- fn smoke_threads() {
- let (tx, rx) = channel::<i32>();
- let _t = thread::spawn(move || {
- tx.send(1).unwrap();
- });
- assert_eq!(rx.recv().unwrap(), 1);
- }
-
- #[test]
- fn smoke_port_gone() {
- let (tx, rx) = channel::<i32>();
- drop(rx);
- assert!(tx.send(1).is_err());
- }
-
- #[test]
- fn smoke_shared_port_gone() {
- let (tx, rx) = channel::<i32>();
- drop(rx);
- assert!(tx.send(1).is_err())
- }
-
- #[test]
- fn smoke_shared_port_gone2() {
- let (tx, rx) = channel::<i32>();
- drop(rx);
- let tx2 = tx.clone();
- drop(tx);
- assert!(tx2.send(1).is_err());
- }
-
- #[test]
- fn port_gone_concurrent() {
- let (tx, rx) = channel::<i32>();
- let _t = thread::spawn(move || {
- rx.recv().unwrap();
- });
- while tx.send(1).is_ok() {}
- }
-
- #[test]
- fn port_gone_concurrent_shared() {
- let (tx, rx) = channel::<i32>();
- let tx2 = tx.clone();
- let _t = thread::spawn(move || {
- rx.recv().unwrap();
- });
- while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
- }
-
- #[test]
- fn smoke_chan_gone() {
- let (tx, rx) = channel::<i32>();
- drop(tx);
- assert!(rx.recv().is_err());
- }
-
- #[test]
- fn smoke_chan_gone_shared() {
- let (tx, rx) = channel::<()>();
- let tx2 = tx.clone();
- drop(tx);
- drop(tx2);
- assert!(rx.recv().is_err());
- }
-
- #[test]
- fn chan_gone_concurrent() {
- let (tx, rx) = channel::<i32>();
- let _t = thread::spawn(move || {
- tx.send(1).unwrap();
- tx.send(1).unwrap();
- });
- while rx.recv().is_ok() {}
- }
-
- #[test]
- fn stress() {
- let (tx, rx) = channel::<i32>();
- let t = thread::spawn(move || {
- for _ in 0..10000 {
- tx.send(1).unwrap();
- }
- });
- for _ in 0..10000 {
- assert_eq!(rx.recv().unwrap(), 1);
- }
- t.join().ok().expect("thread panicked");
- }
-
- #[test]
- fn stress_shared() {
- const AMT: u32 = 10000;
- const NTHREADS: u32 = 8;
- let (tx, rx) = channel::<i32>();
-
- let t = thread::spawn(move || {
- for _ in 0..AMT * NTHREADS {
- assert_eq!(rx.recv().unwrap(), 1);
- }
- match rx.try_recv() {
- Ok(..) => panic!(),
- _ => {}
- }
- });
-
- for _ in 0..NTHREADS {
- let tx = tx.clone();
- thread::spawn(move || {
- for _ in 0..AMT {
- tx.send(1).unwrap();
- }
- });
- }
- drop(tx);
- t.join().ok().expect("thread panicked");
- }
-
- #[test]
- fn send_from_outside_runtime() {
- let (tx1, rx1) = channel::<()>();
- let (tx2, rx2) = channel::<i32>();
- let t1 = thread::spawn(move || {
- tx1.send(()).unwrap();
- for _ in 0..40 {
- assert_eq!(rx2.recv().unwrap(), 1);
- }
- });
- rx1.recv().unwrap();
- let t2 = thread::spawn(move || {
- for _ in 0..40 {
- tx2.send(1).unwrap();
- }
- });
- t1.join().ok().expect("thread panicked");
- t2.join().ok().expect("thread panicked");
- }
-
- #[test]
- fn recv_from_outside_runtime() {
- let (tx, rx) = channel::<i32>();
- let t = thread::spawn(move || {
- for _ in 0..40 {
- assert_eq!(rx.recv().unwrap(), 1);
- }
- });
- for _ in 0..40 {
- tx.send(1).unwrap();
- }
- t.join().ok().expect("thread panicked");
- }
-
- #[test]
- fn no_runtime() {
- let (tx1, rx1) = channel::<i32>();
- let (tx2, rx2) = channel::<i32>();
- let t1 = thread::spawn(move || {
- assert_eq!(rx1.recv().unwrap(), 1);
- tx2.send(2).unwrap();
- });
- let t2 = thread::spawn(move || {
- tx1.send(1).unwrap();
- assert_eq!(rx2.recv().unwrap(), 2);
- });
- t1.join().ok().expect("thread panicked");
- t2.join().ok().expect("thread panicked");
- }
-
- #[test]
- fn oneshot_single_thread_close_port_first() {
- // Simple test of closing without sending
- let (_tx, rx) = channel::<i32>();
- drop(rx);
- }
-
- #[test]
- fn oneshot_single_thread_close_chan_first() {
- // Simple test of closing without sending
- let (tx, _rx) = channel::<i32>();
- drop(tx);
- }
-
- #[test]
- fn oneshot_single_thread_send_port_close() {
- // Testing that the sender cleans up the payload if receiver is closed
- let (tx, rx) = channel::<Box<i32>>();
- drop(rx);
- assert!(tx.send(box 0).is_err());
- }
-
- #[test]
- fn oneshot_single_thread_recv_chan_close() {
- // Receiving on a closed chan will panic
- let res = thread::spawn(move || {
- let (tx, rx) = channel::<i32>();
- drop(tx);
- rx.recv().unwrap();
- })
- .join();
- // What is our res?
- assert!(res.is_err());
- }
-
- #[test]
- fn oneshot_single_thread_send_then_recv() {
- let (tx, rx) = channel::<Box<i32>>();
- tx.send(box 10).unwrap();
- assert!(*rx.recv().unwrap() == 10);
- }
-
- #[test]
- fn oneshot_single_thread_try_send_open() {
- let (tx, rx) = channel::<i32>();
- assert!(tx.send(10).is_ok());
- assert!(rx.recv().unwrap() == 10);
- }
-
- #[test]
- fn oneshot_single_thread_try_send_closed() {
- let (tx, rx) = channel::<i32>();
- drop(rx);
- assert!(tx.send(10).is_err());
- }
-
- #[test]
- fn oneshot_single_thread_try_recv_open() {
- let (tx, rx) = channel::<i32>();
- tx.send(10).unwrap();
- assert!(rx.recv() == Ok(10));
- }
-
- #[test]
- fn oneshot_single_thread_try_recv_closed() {
- let (tx, rx) = channel::<i32>();
- drop(tx);
- assert!(rx.recv().is_err());
- }
-
- #[test]
- fn oneshot_single_thread_peek_data() {
- let (tx, rx) = channel::<i32>();
- assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
- tx.send(10).unwrap();
- assert_eq!(rx.try_recv(), Ok(10));
- }
-
- #[test]
- fn oneshot_single_thread_peek_close() {
- let (tx, rx) = channel::<i32>();
- drop(tx);
- assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
- assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
- }
-
- #[test]
- fn oneshot_single_thread_peek_open() {
- let (_tx, rx) = channel::<i32>();
- assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
- }
-
- #[test]
- fn oneshot_multi_task_recv_then_send() {
- let (tx, rx) = channel::<Box<i32>>();
- let _t = thread::spawn(move || {
- assert!(*rx.recv().unwrap() == 10);
- });
-
- tx.send(box 10).unwrap();
- }
-
- #[test]
- fn oneshot_multi_task_recv_then_close() {
- let (tx, rx) = channel::<Box<i32>>();
- let _t = thread::spawn(move || {
- drop(tx);
- });
- let res = thread::spawn(move || {
- assert!(*rx.recv().unwrap() == 10);
- })
- .join();
- assert!(res.is_err());
- }
-
- #[test]
- fn oneshot_multi_thread_close_stress() {
- for _ in 0..stress_factor() {
- let (tx, rx) = channel::<i32>();
- let _t = thread::spawn(move || {
- drop(rx);
- });
- drop(tx);
- }
- }
-
- #[test]
- fn oneshot_multi_thread_send_close_stress() {
- for _ in 0..stress_factor() {
- let (tx, rx) = channel::<i32>();
- let _t = thread::spawn(move || {
- drop(rx);
- });
- let _ = thread::spawn(move || {
- tx.send(1).unwrap();
- })
- .join();
- }
- }
-
- #[test]
- fn oneshot_multi_thread_recv_close_stress() {
- for _ in 0..stress_factor() {
- let (tx, rx) = channel::<i32>();
- thread::spawn(move || {
- let res = thread::spawn(move || {
- rx.recv().unwrap();
- })
- .join();
- assert!(res.is_err());
- });
- let _t = thread::spawn(move || {
- thread::spawn(move || {
- drop(tx);
- });
- });
- }
- }
-
- #[test]
- fn oneshot_multi_thread_send_recv_stress() {
- for _ in 0..stress_factor() {
- let (tx, rx) = channel::<Box<isize>>();
- let _t = thread::spawn(move || {
- tx.send(box 10).unwrap();
- });
- assert!(*rx.recv().unwrap() == 10);
- }
- }
-
- #[test]
- fn stream_send_recv_stress() {
- for _ in 0..stress_factor() {
- let (tx, rx) = channel();
-
- send(tx, 0);
- recv(rx, 0);
-
- fn send(tx: Sender<Box<i32>>, i: i32) {
- if i == 10 {
- return;
- }
-
- thread::spawn(move || {
- tx.send(box i).unwrap();
- send(tx, i + 1);
- });
- }
-
- fn recv(rx: Receiver<Box<i32>>, i: i32) {
- if i == 10 {
- return;
- }
-
- thread::spawn(move || {
- assert!(*rx.recv().unwrap() == i);
- recv(rx, i + 1);
- });
- }
- }
- }
-
- #[test]
- fn oneshot_single_thread_recv_timeout() {
- let (tx, rx) = channel();
- tx.send(()).unwrap();
- assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
- assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
- tx.send(()).unwrap();
- assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
- }
-
- #[test]
- fn stress_recv_timeout_two_threads() {
- let (tx, rx) = channel();
- let stress = stress_factor() + 100;
- let timeout = Duration::from_millis(100);
-
- thread::spawn(move || {
- for i in 0..stress {
- if i % 2 == 0 {
- thread::sleep(timeout * 2);
- }
- tx.send(1usize).unwrap();
- }
- });
-
- let mut recv_count = 0;
- loop {
- match rx.recv_timeout(timeout) {
- Ok(n) => {
- assert_eq!(n, 1usize);
- recv_count += 1;
- }
- Err(RecvTimeoutError::Timeout) => continue,
- Err(RecvTimeoutError::Disconnected) => break,
- }
- }
-
- assert_eq!(recv_count, stress);
- }
-
- #[test]
- fn recv_timeout_upgrade() {
- let (tx, rx) = channel::<()>();
- let timeout = Duration::from_millis(1);
- let _tx_clone = tx.clone();
-
- let start = Instant::now();
- assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
- assert!(Instant::now() >= start + timeout);
- }
-
- #[test]
- fn stress_recv_timeout_shared() {
- let (tx, rx) = channel();
- let stress = stress_factor() + 100;
-
- for i in 0..stress {
- let tx = tx.clone();
- thread::spawn(move || {
- thread::sleep(Duration::from_millis(i as u64 * 10));
- tx.send(1usize).unwrap();
- });
- }
-
- drop(tx);
-
- let mut recv_count = 0;
- loop {
- match rx.recv_timeout(Duration::from_millis(10)) {
- Ok(n) => {
- assert_eq!(n, 1usize);
- recv_count += 1;
- }
- Err(RecvTimeoutError::Timeout) => continue,
- Err(RecvTimeoutError::Disconnected) => break,
- }
- }
-
- assert_eq!(recv_count, stress);
- }
-
- #[test]
- fn very_long_recv_timeout_wont_panic() {
- let (tx, rx) = channel::<()>();
- let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX)));
- thread::sleep(Duration::from_secs(1));
- assert!(tx.send(()).is_ok());
- assert_eq!(join_handle.join().unwrap(), Ok(()));
- }
-
- #[test]
- fn recv_a_lot() {
- // Regression test that we don't run out of stack in scheduler context
- let (tx, rx) = channel();
- for _ in 0..10000 {
- tx.send(()).unwrap();
- }
- for _ in 0..10000 {
- rx.recv().unwrap();
- }
- }
-
- #[test]
- fn shared_recv_timeout() {
- let (tx, rx) = channel();
- let total = 5;
- for _ in 0..total {
- let tx = tx.clone();
- thread::spawn(move || {
- tx.send(()).unwrap();
- });
- }
-
- for _ in 0..total {
- rx.recv().unwrap();
- }
-
- assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
- tx.send(()).unwrap();
- assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
- }
-
- #[test]
- fn shared_chan_stress() {
- let (tx, rx) = channel();
- let total = stress_factor() + 100;
- for _ in 0..total {
- let tx = tx.clone();
- thread::spawn(move || {
- tx.send(()).unwrap();
- });
- }
-
- for _ in 0..total {
- rx.recv().unwrap();
- }
- }
-
- #[test]
- fn test_nested_recv_iter() {
- let (tx, rx) = channel::<i32>();
- let (total_tx, total_rx) = channel::<i32>();
-
- let _t = thread::spawn(move || {
- let mut acc = 0;
- for x in rx.iter() {
- acc += x;
- }
- total_tx.send(acc).unwrap();
- });
-
- tx.send(3).unwrap();
- tx.send(1).unwrap();
- tx.send(2).unwrap();
- drop(tx);
- assert_eq!(total_rx.recv().unwrap(), 6);
- }
-
- #[test]
- fn test_recv_iter_break() {
- let (tx, rx) = channel::<i32>();
- let (count_tx, count_rx) = channel();
-
- let _t = thread::spawn(move || {
- let mut count = 0;
- for x in rx.iter() {
- if count >= 3 {
- break;
- } else {
- count += x;
- }
- }
- count_tx.send(count).unwrap();
- });
-
- tx.send(2).unwrap();
- tx.send(2).unwrap();
- tx.send(2).unwrap();
- let _ = tx.send(2);
- drop(tx);
- assert_eq!(count_rx.recv().unwrap(), 4);
- }
-
- #[test]
- fn test_recv_try_iter() {
- let (request_tx, request_rx) = channel();
- let (response_tx, response_rx) = channel();
-
- // Request `x`s until we have `6`.
- let t = thread::spawn(move || {
- let mut count = 0;
- loop {
- for x in response_rx.try_iter() {
- count += x;
- if count == 6 {
- return count;
- }
- }
- request_tx.send(()).unwrap();
- }
- });
-
- for _ in request_rx.iter() {
- if response_tx.send(2).is_err() {
- break;
- }
- }
-
- assert_eq!(t.join().unwrap(), 6);
- }
-
- #[test]
- fn test_recv_into_iter_owned() {
- let mut iter = {
- let (tx, rx) = channel::<i32>();
- tx.send(1).unwrap();
- tx.send(2).unwrap();
-
- rx.into_iter()
- };
- assert_eq!(iter.next().unwrap(), 1);
- assert_eq!(iter.next().unwrap(), 2);
- assert_eq!(iter.next().is_none(), true);
- }
-
- #[test]
- fn test_recv_into_iter_borrowed() {
- let (tx, rx) = channel::<i32>();
- tx.send(1).unwrap();
- tx.send(2).unwrap();
- drop(tx);
- let mut iter = (&rx).into_iter();
- assert_eq!(iter.next().unwrap(), 1);
- assert_eq!(iter.next().unwrap(), 2);
- assert_eq!(iter.next().is_none(), true);
- }
-
- #[test]
- fn try_recv_states() {
- let (tx1, rx1) = channel::<i32>();
- let (tx2, rx2) = channel::<()>();
- let (tx3, rx3) = channel::<()>();
- let _t = thread::spawn(move || {
- rx2.recv().unwrap();
- tx1.send(1).unwrap();
- tx3.send(()).unwrap();
- rx2.recv().unwrap();
- drop(tx1);
- tx3.send(()).unwrap();
- });
-
- assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
- tx2.send(()).unwrap();
- rx3.recv().unwrap();
- assert_eq!(rx1.try_recv(), Ok(1));
- assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
- tx2.send(()).unwrap();
- rx3.recv().unwrap();
- assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
- }
-
- // This bug used to end up in a livelock inside of the Receiver destructor
- // because the internal state of the Shared packet was corrupted
- #[test]
- fn destroy_upgraded_shared_port_when_sender_still_active() {
- let (tx, rx) = channel();
- let (tx2, rx2) = channel();
- let _t = thread::spawn(move || {
- rx.recv().unwrap(); // wait on a oneshot
- drop(rx); // destroy a shared
- tx2.send(()).unwrap();
- });
- // make sure the other thread has gone to sleep
- for _ in 0..5000 {
- thread::yield_now();
- }
-
- // upgrade to a shared chan and send a message
- let t = tx.clone();
- drop(tx);
- t.send(()).unwrap();
-
- // wait for the child thread to exit before we exit
- rx2.recv().unwrap();
- }
-
- #[test]
- fn issue_32114() {
- let (tx, _) = channel();
- let _ = tx.send(123);
- assert_eq!(tx.send(123), Err(SendError(123)));
- }
-}
-
-#[cfg(all(test, not(target_os = "emscripten")))]
-mod sync_tests {
- use super::*;
- use crate::env;
- use crate::thread;
- use crate::time::Duration;
-
- pub fn stress_factor() -> usize {
- match env::var("RUST_TEST_STRESS") {
- Ok(val) => val.parse().unwrap(),
- Err(..) => 1,
- }
- }
-
- #[test]
- fn smoke() {
- let (tx, rx) = sync_channel::<i32>(1);
- tx.send(1).unwrap();
- assert_eq!(rx.recv().unwrap(), 1);
- }
-
- #[test]
- fn drop_full() {
- let (tx, _rx) = sync_channel::<Box<isize>>(1);
- tx.send(box 1).unwrap();
- }
-
- #[test]
- fn smoke_shared() {
- let (tx, rx) = sync_channel::<i32>(1);
- tx.send(1).unwrap();
- assert_eq!(rx.recv().unwrap(), 1);
- let tx = tx.clone();
- tx.send(1).unwrap();
- assert_eq!(rx.recv().unwrap(), 1);
- }
-
- #[test]
- fn recv_timeout() {
- let (tx, rx) = sync_channel::<i32>(1);
- assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
- tx.send(1).unwrap();
- assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
- }
-
- #[test]
- fn smoke_threads() {
- let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move || {
- tx.send(1).unwrap();
- });
- assert_eq!(rx.recv().unwrap(), 1);
- }
-
- #[test]
- fn smoke_port_gone() {
- let (tx, rx) = sync_channel::<i32>(0);
- drop(rx);
- assert!(tx.send(1).is_err());
- }
-
- #[test]
- fn smoke_shared_port_gone2() {
- let (tx, rx) = sync_channel::<i32>(0);
- drop(rx);
- let tx2 = tx.clone();
- drop(tx);
- assert!(tx2.send(1).is_err());
- }
-
- #[test]
- fn port_gone_concurrent() {
- let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move || {
- rx.recv().unwrap();
- });
- while tx.send(1).is_ok() {}
- }
-
- #[test]
- fn port_gone_concurrent_shared() {
- let (tx, rx) = sync_channel::<i32>(0);
- let tx2 = tx.clone();
- let _t = thread::spawn(move || {
- rx.recv().unwrap();
- });
- while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
- }
-
- #[test]
- fn smoke_chan_gone() {
- let (tx, rx) = sync_channel::<i32>(0);
- drop(tx);
- assert!(rx.recv().is_err());
- }
-
- #[test]
- fn smoke_chan_gone_shared() {
- let (tx, rx) = sync_channel::<()>(0);
- let tx2 = tx.clone();
- drop(tx);
- drop(tx2);
- assert!(rx.recv().is_err());
- }
-
- #[test]
- fn chan_gone_concurrent() {
- let (tx, rx) = sync_channel::<i32>(0);
- thread::spawn(move || {
- tx.send(1).unwrap();
- tx.send(1).unwrap();
- });
- while rx.recv().is_ok() {}
- }
-
- #[test]
- fn stress() {
- let (tx, rx) = sync_channel::<i32>(0);
- thread::spawn(move || {
- for _ in 0..10000 {
- tx.send(1).unwrap();
- }
- });
- for _ in 0..10000 {
- assert_eq!(rx.recv().unwrap(), 1);
- }
- }
-
- #[test]
- fn stress_recv_timeout_two_threads() {
- let (tx, rx) = sync_channel::<i32>(0);
-
- thread::spawn(move || {
- for _ in 0..10000 {
- tx.send(1).unwrap();
- }
- });
-
- let mut recv_count = 0;
- loop {
- match rx.recv_timeout(Duration::from_millis(1)) {
- Ok(v) => {
- assert_eq!(v, 1);
- recv_count += 1;
- }
- Err(RecvTimeoutError::Timeout) => continue,
- Err(RecvTimeoutError::Disconnected) => break,
- }
- }
-
- assert_eq!(recv_count, 10000);
- }
-
- #[test]
- fn stress_recv_timeout_shared() {
- const AMT: u32 = 1000;
- const NTHREADS: u32 = 8;
- let (tx, rx) = sync_channel::<i32>(0);
- let (dtx, drx) = sync_channel::<()>(0);
-
- thread::spawn(move || {
- let mut recv_count = 0;
- loop {
- match rx.recv_timeout(Duration::from_millis(10)) {
- Ok(v) => {
- assert_eq!(v, 1);
- recv_count += 1;
- }
- Err(RecvTimeoutError::Timeout) => continue,
- Err(RecvTimeoutError::Disconnected) => break,
- }
- }
-
- assert_eq!(recv_count, AMT * NTHREADS);
- assert!(rx.try_recv().is_err());
-
- dtx.send(()).unwrap();
- });
-
- for _ in 0..NTHREADS {
- let tx = tx.clone();
- thread::spawn(move || {
- for _ in 0..AMT {
- tx.send(1).unwrap();
- }
- });
- }
-
- drop(tx);
-
- drx.recv().unwrap();
- }
-
- #[test]
- fn stress_shared() {
- const AMT: u32 = 1000;
- const NTHREADS: u32 = 8;
- let (tx, rx) = sync_channel::<i32>(0);
- let (dtx, drx) = sync_channel::<()>(0);
-
- thread::spawn(move || {
- for _ in 0..AMT * NTHREADS {
- assert_eq!(rx.recv().unwrap(), 1);
- }
- match rx.try_recv() {
- Ok(..) => panic!(),
- _ => {}
- }
- dtx.send(()).unwrap();
- });
-
- for _ in 0..NTHREADS {
- let tx = tx.clone();
- thread::spawn(move || {
- for _ in 0..AMT {
- tx.send(1).unwrap();
- }
- });
- }
- drop(tx);
- drx.recv().unwrap();
- }
-
- #[test]
- fn oneshot_single_thread_close_port_first() {
- // Simple test of closing without sending
- let (_tx, rx) = sync_channel::<i32>(0);
- drop(rx);
- }
-
- #[test]
- fn oneshot_single_thread_close_chan_first() {
- // Simple test of closing without sending
- let (tx, _rx) = sync_channel::<i32>(0);
- drop(tx);
- }
-
- #[test]
- fn oneshot_single_thread_send_port_close() {
- // Testing that the sender cleans up the payload if receiver is closed
- let (tx, rx) = sync_channel::<Box<i32>>(0);
- drop(rx);
- assert!(tx.send(box 0).is_err());
- }
-
- #[test]
- fn oneshot_single_thread_recv_chan_close() {
- // Receiving on a closed chan will panic
- let res = thread::spawn(move || {
- let (tx, rx) = sync_channel::<i32>(0);
- drop(tx);
- rx.recv().unwrap();
- })
- .join();
- // What is our res?
- assert!(res.is_err());
- }
-
- #[test]
- fn oneshot_single_thread_send_then_recv() {
- let (tx, rx) = sync_channel::<Box<i32>>(1);
- tx.send(box 10).unwrap();
- assert!(*rx.recv().unwrap() == 10);
- }
-
- #[test]
- fn oneshot_single_thread_try_send_open() {
- let (tx, rx) = sync_channel::<i32>(1);
- assert_eq!(tx.try_send(10), Ok(()));
- assert!(rx.recv().unwrap() == 10);
- }
-
- #[test]
- fn oneshot_single_thread_try_send_closed() {
- let (tx, rx) = sync_channel::<i32>(0);
- drop(rx);
- assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
- }
-
- #[test]
- fn oneshot_single_thread_try_send_closed2() {
- let (tx, _rx) = sync_channel::<i32>(0);
- assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
- }
-
- #[test]
- fn oneshot_single_thread_try_recv_open() {
- let (tx, rx) = sync_channel::<i32>(1);
- tx.send(10).unwrap();
- assert!(rx.recv() == Ok(10));
- }
-
- #[test]
- fn oneshot_single_thread_try_recv_closed() {
- let (tx, rx) = sync_channel::<i32>(0);
- drop(tx);
- assert!(rx.recv().is_err());
- }
-
- #[test]
- fn oneshot_single_thread_try_recv_closed_with_data() {
- let (tx, rx) = sync_channel::<i32>(1);
- tx.send(10).unwrap();
- drop(tx);
- assert_eq!(rx.try_recv(), Ok(10));
- assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
- }
-
- #[test]
- fn oneshot_single_thread_peek_data() {
- let (tx, rx) = sync_channel::<i32>(1);
- assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
- tx.send(10).unwrap();
- assert_eq!(rx.try_recv(), Ok(10));
- }
-
- #[test]
- fn oneshot_single_thread_peek_close() {
- let (tx, rx) = sync_channel::<i32>(0);
- drop(tx);
- assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
- assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
- }
-
- #[test]
- fn oneshot_single_thread_peek_open() {
- let (_tx, rx) = sync_channel::<i32>(0);
- assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
- }
-
- #[test]
- fn oneshot_multi_task_recv_then_send() {
- let (tx, rx) = sync_channel::<Box<i32>>(0);
- let _t = thread::spawn(move || {
- assert!(*rx.recv().unwrap() == 10);
- });
-
- tx.send(box 10).unwrap();
- }
-
- #[test]
- fn oneshot_multi_task_recv_then_close() {
- let (tx, rx) = sync_channel::<Box<i32>>(0);
- let _t = thread::spawn(move || {
- drop(tx);
- });
- let res = thread::spawn(move || {
- assert!(*rx.recv().unwrap() == 10);
- })
- .join();
- assert!(res.is_err());
- }
-
- #[test]
- fn oneshot_multi_thread_close_stress() {
- for _ in 0..stress_factor() {
- let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move || {
- drop(rx);
- });
- drop(tx);
- }
- }
-
- #[test]
- fn oneshot_multi_thread_send_close_stress() {
- for _ in 0..stress_factor() {
- let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move || {
- drop(rx);
- });
- let _ = thread::spawn(move || {
- tx.send(1).unwrap();
- })
- .join();
- }
- }
-
- #[test]
- fn oneshot_multi_thread_recv_close_stress() {
- for _ in 0..stress_factor() {
- let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move || {
- let res = thread::spawn(move || {
- rx.recv().unwrap();
- })
- .join();
- assert!(res.is_err());
- });
- let _t = thread::spawn(move || {
- thread::spawn(move || {
- drop(tx);
- });
- });
- }
- }
-
- #[test]
- fn oneshot_multi_thread_send_recv_stress() {
- for _ in 0..stress_factor() {
- let (tx, rx) = sync_channel::<Box<i32>>(0);
- let _t = thread::spawn(move || {
- tx.send(box 10).unwrap();
- });
- assert!(*rx.recv().unwrap() == 10);
- }
- }
-
- #[test]
- fn stream_send_recv_stress() {
- for _ in 0..stress_factor() {
- let (tx, rx) = sync_channel::<Box<i32>>(0);
-
- send(tx, 0);
- recv(rx, 0);
-
- fn send(tx: SyncSender<Box<i32>>, i: i32) {
- if i == 10 {
- return;
- }
-
- thread::spawn(move || {
- tx.send(box i).unwrap();
- send(tx, i + 1);
- });
- }
-
- fn recv(rx: Receiver<Box<i32>>, i: i32) {
- if i == 10 {
- return;
- }
-
- thread::spawn(move || {
- assert!(*rx.recv().unwrap() == i);
- recv(rx, i + 1);
- });
- }
- }
- }
-
- #[test]
- fn recv_a_lot() {
- // Regression test that we don't run out of stack in scheduler context
- let (tx, rx) = sync_channel(10000);
- for _ in 0..10000 {
- tx.send(()).unwrap();
- }
- for _ in 0..10000 {
- rx.recv().unwrap();
- }
- }
-
- #[test]
- fn shared_chan_stress() {
- let (tx, rx) = sync_channel(0);
- let total = stress_factor() + 100;
- for _ in 0..total {
- let tx = tx.clone();
- thread::spawn(move || {
- tx.send(()).unwrap();
- });
- }
-
- for _ in 0..total {
- rx.recv().unwrap();
- }
- }
-
- #[test]
- fn test_nested_recv_iter() {
- let (tx, rx) = sync_channel::<i32>(0);
- let (total_tx, total_rx) = sync_channel::<i32>(0);
-
- let _t = thread::spawn(move || {
- let mut acc = 0;
- for x in rx.iter() {
- acc += x;
- }
- total_tx.send(acc).unwrap();
- });
-
- tx.send(3).unwrap();
- tx.send(1).unwrap();
- tx.send(2).unwrap();
- drop(tx);
- assert_eq!(total_rx.recv().unwrap(), 6);
- }
-
- #[test]
- fn test_recv_iter_break() {
- let (tx, rx) = sync_channel::<i32>(0);
- let (count_tx, count_rx) = sync_channel(0);
-
- let _t = thread::spawn(move || {
- let mut count = 0;
- for x in rx.iter() {
- if count >= 3 {
- break;
- } else {
- count += x;
- }
- }
- count_tx.send(count).unwrap();
- });
-
- tx.send(2).unwrap();
- tx.send(2).unwrap();
- tx.send(2).unwrap();
- let _ = tx.try_send(2);
- drop(tx);
- assert_eq!(count_rx.recv().unwrap(), 4);
- }
-
- #[test]
- fn try_recv_states() {
- let (tx1, rx1) = sync_channel::<i32>(1);
- let (tx2, rx2) = sync_channel::<()>(1);
- let (tx3, rx3) = sync_channel::<()>(1);
- let _t = thread::spawn(move || {
- rx2.recv().unwrap();
- tx1.send(1).unwrap();
- tx3.send(()).unwrap();
- rx2.recv().unwrap();
- drop(tx1);
- tx3.send(()).unwrap();
- });
-
- assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
- tx2.send(()).unwrap();
- rx3.recv().unwrap();
- assert_eq!(rx1.try_recv(), Ok(1));
- assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
- tx2.send(()).unwrap();
- rx3.recv().unwrap();
- assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
- }
-
- // This bug used to end up in a livelock inside of the Receiver destructor
- // because the internal state of the Shared packet was corrupted
- #[test]
- fn destroy_upgraded_shared_port_when_sender_still_active() {
- let (tx, rx) = sync_channel::<()>(0);
- let (tx2, rx2) = sync_channel::<()>(0);
- let _t = thread::spawn(move || {
- rx.recv().unwrap(); // wait on a oneshot
- drop(rx); // destroy a shared
- tx2.send(()).unwrap();
- });
- // make sure the other thread has gone to sleep
- for _ in 0..5000 {
- thread::yield_now();
- }
-
- // upgrade to a shared chan and send a message
- let t = tx.clone();
- drop(tx);
- t.send(()).unwrap();
-
- // wait for the child thread to exit before we exit
- rx2.recv().unwrap();
- }
-
- #[test]
- fn send1() {
- let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move || {
- rx.recv().unwrap();
- });
- assert_eq!(tx.send(1), Ok(()));
- }
-
- #[test]
- fn send2() {
- let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move || {
- drop(rx);
- });
- assert!(tx.send(1).is_err());
- }
-
- #[test]
- fn send3() {
- let (tx, rx) = sync_channel::<i32>(1);
- assert_eq!(tx.send(1), Ok(()));
- let _t = thread::spawn(move || {
- drop(rx);
- });
- assert!(tx.send(1).is_err());
- }
-
- #[test]
- fn send4() {
- let (tx, rx) = sync_channel::<i32>(0);
- let tx2 = tx.clone();
- let (done, donerx) = channel();
- let done2 = done.clone();
- let _t = thread::spawn(move || {
- assert!(tx.send(1).is_err());
- done.send(()).unwrap();
- });
- let _t = thread::spawn(move || {
- assert!(tx2.send(2).is_err());
- done2.send(()).unwrap();
- });
- drop(rx);
- donerx.recv().unwrap();
- donerx.recv().unwrap();
- }
-
- #[test]
- fn try_send1() {
- let (tx, _rx) = sync_channel::<i32>(0);
- assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
- }
-
- #[test]
- fn try_send2() {
- let (tx, _rx) = sync_channel::<i32>(1);
- assert_eq!(tx.try_send(1), Ok(()));
- assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
- }
-
- #[test]
- fn try_send3() {
- let (tx, rx) = sync_channel::<i32>(1);
- assert_eq!(tx.try_send(1), Ok(()));
- drop(rx);
- assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
- }
-
- #[test]
- fn issue_15761() {
- fn repro() {
- let (tx1, rx1) = sync_channel::<()>(3);
- let (tx2, rx2) = sync_channel::<()>(3);
-
- let _t = thread::spawn(move || {
- rx1.recv().unwrap();
- tx2.try_send(()).unwrap();
- });
-
- tx1.try_send(()).unwrap();
- rx2.recv().unwrap();
- }
-
- for _ in 0..100 {
- repro()
- }
- }
-}
diff --git a/src/libstd/sync/mpsc/mpsc_queue.rs b/src/libstd/sync/mpsc/mpsc_queue.rs
deleted file mode 100644
index 6e7a7be4430..00000000000
--- a/src/libstd/sync/mpsc/mpsc_queue.rs
+++ /dev/null
@@ -1,165 +0,0 @@
-//! A mostly lock-free multi-producer, single consumer queue.
-//!
-//! This module contains an implementation of a concurrent MPSC queue. This
-//! queue can be used to share data between threads, and is also used as the
-//! building block of channels in rust.
-//!
-//! Note that the current implementation of this queue has a caveat of the `pop`
-//! method, and see the method for more information about it. Due to this
-//! caveat, this queue may not be appropriate for all use-cases.
-
-// http://www.1024cores.net/home/lock-free-algorithms
-// /queues/non-intrusive-mpsc-node-based-queue
-
-pub use self::PopResult::*;
-
-use core::cell::UnsafeCell;
-use core::ptr;
-
-use crate::boxed::Box;
-use crate::sync::atomic::{AtomicPtr, Ordering};
-
-/// A result of the `pop` function.
-pub enum PopResult<T> {
- /// Some data has been popped
- Data(T),
- /// The queue is empty
- Empty,
- /// The queue is in an inconsistent state. Popping data should succeed, but
- /// some pushers have yet to make enough progress in order allow a pop to
- /// succeed. It is recommended that a pop() occur "in the near future" in
- /// order to see if the sender has made progress or not
- Inconsistent,
-}
-
-struct Node<T> {
- next: AtomicPtr<Node<T>>,
- value: Option<T>,
-}
-
-/// The multi-producer single-consumer structure. This is not cloneable, but it
-/// may be safely shared so long as it is guaranteed that there is only one
-/// popper at a time (many pushers are allowed).
-pub struct Queue<T> {
- head: AtomicPtr<Node<T>>,
- tail: UnsafeCell<*mut Node<T>>,
-}
-
-unsafe impl<T: Send> Send for Queue<T> {}
-unsafe impl<T: Send> Sync for Queue<T> {}
-
-impl<T> Node<T> {
- unsafe fn new(v: Option<T>) -> *mut Node<T> {
- Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v })
- }
-}
-
-impl<T> Queue<T> {
- /// Creates a new queue that is safe to share among multiple producers and
- /// one consumer.
- pub fn new() -> Queue<T> {
- let stub = unsafe { Node::new(None) };
- Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
- }
-
- /// Pushes a new value onto this queue.
- pub fn push(&self, t: T) {
- unsafe {
- let n = Node::new(Some(t));
- let prev = self.head.swap(n, Ordering::AcqRel);
- (*prev).next.store(n, Ordering::Release);
- }
- }
-
- /// Pops some data from this queue.
- ///
- /// Note that the current implementation means that this function cannot
- /// return `Option<T>`. It is possible for this queue to be in an
- /// inconsistent state where many pushes have succeeded and completely
- /// finished, but pops cannot return `Some(t)`. This inconsistent state
- /// happens when a pusher is pre-empted at an inopportune moment.
- ///
- /// This inconsistent state means that this queue does indeed have data, but
- /// it does not currently have access to it at this time.
- pub fn pop(&self) -> PopResult<T> {
- unsafe {
- let tail = *self.tail.get();
- let next = (*tail).next.load(Ordering::Acquire);
-
- if !next.is_null() {
- *self.tail.get() = next;
- assert!((*tail).value.is_none());
- assert!((*next).value.is_some());
- let ret = (*next).value.take().unwrap();
- let _: Box<Node<T>> = Box::from_raw(tail);
- return Data(ret);
- }
-
- if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
- }
- }
-}
-
-impl<T> Drop for Queue<T> {
- fn drop(&mut self) {
- unsafe {
- let mut cur = *self.tail.get();
- while !cur.is_null() {
- let next = (*cur).next.load(Ordering::Relaxed);
- let _: Box<Node<T>> = Box::from_raw(cur);
- cur = next;
- }
- }
- }
-}
-
-#[cfg(all(test, not(target_os = "emscripten")))]
-mod tests {
- use super::{Data, Empty, Inconsistent, Queue};
- use crate::sync::mpsc::channel;
- use crate::sync::Arc;
- use crate::thread;
-
- #[test]
- fn test_full() {
- let q: Queue<Box<_>> = Queue::new();
- q.push(box 1);
- q.push(box 2);
- }
-
- #[test]
- fn test() {
- let nthreads = 8;
- let nmsgs = 1000;
- let q = Queue::new();
- match q.pop() {
- Empty => {}
- Inconsistent | Data(..) => panic!(),
- }
- let (tx, rx) = channel();
- let q = Arc::new(q);
-
- for _ in 0..nthreads {
- let tx = tx.clone();
- let q = q.clone();
- thread::spawn(move || {
- for i in 0..nmsgs {
- q.push(i);
- }
- tx.send(()).unwrap();
- });
- }
-
- let mut i = 0;
- while i < nthreads * nmsgs {
- match q.pop() {
- Empty | Inconsistent => {}
- Data(_) => i += 1,
- }
- }
- drop(tx);
- for _ in 0..nthreads {
- rx.recv().unwrap();
- }
- }
-}
diff --git a/src/libstd/sync/mpsc/oneshot.rs b/src/libstd/sync/mpsc/oneshot.rs
deleted file mode 100644
index 75f5621fa12..00000000000
--- a/src/libstd/sync/mpsc/oneshot.rs
+++ /dev/null
@@ -1,307 +0,0 @@
-/// Oneshot channels/ports
-///
-/// This is the initial flavor of channels/ports used for comm module. This is
-/// an optimization for the one-use case of a channel. The major optimization of
-/// this type is to have one and exactly one allocation when the chan/port pair
-/// is created.
-///
-/// Another possible optimization would be to not use an Arc box because
-/// in theory we know when the shared packet can be deallocated (no real need
-/// for the atomic reference counting), but I was having trouble how to destroy
-/// the data early in a drop of a Port.
-///
-/// # Implementation
-///
-/// Oneshots are implemented around one atomic usize variable. This variable
-/// indicates both the state of the port/chan but also contains any threads
-/// blocked on the port. All atomic operations happen on this one word.
-///
-/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
-/// on behalf of the channel side of things (it can be mentally thought of as
-/// consuming the port). This upgrade is then also stored in the shared packet.
-/// The one caveat to consider is that when a port sees a disconnected channel
-/// it must check for data because there is no "data plus upgrade" state.
-pub use self::Failure::*;
-use self::MyUpgrade::*;
-pub use self::UpgradeResult::*;
-
-use crate::cell::UnsafeCell;
-use crate::ptr;
-use crate::sync::atomic::{AtomicUsize, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken};
-use crate::sync::mpsc::Receiver;
-use crate::time::Instant;
-
-// Various states you can find a port in.
-const EMPTY: usize = 0; // initial state: no data, no blocked receiver
-const DATA: usize = 1; // data ready for receiver to take
-const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
-// Any other value represents a pointer to a SignalToken value. The
-// protocol ensures that when the state moves *to* a pointer,
-// ownership of the token is given to the packet, and when the state
-// moves *from* a pointer, ownership of the token is transferred to
-// whoever changed the state.
-
-pub struct Packet<T> {
- // Internal state of the chan/port pair (stores the blocked thread as well)
- state: AtomicUsize,
- // One-shot data slot location
- data: UnsafeCell<Option<T>>,
- // when used for the second time, a oneshot channel must be upgraded, and
- // this contains the slot for the upgrade
- upgrade: UnsafeCell<MyUpgrade<T>>,
-}
-
-pub enum Failure<T> {
- Empty,
- Disconnected,
- Upgraded(Receiver<T>),
-}
-
-pub enum UpgradeResult {
- UpSuccess,
- UpDisconnected,
- UpWoke(SignalToken),
-}
-
-enum MyUpgrade<T> {
- NothingSent,
- SendUsed,
- GoUp(Receiver<T>),
-}
-
-impl<T> Packet<T> {
- pub fn new() -> Packet<T> {
- Packet {
- data: UnsafeCell::new(None),
- upgrade: UnsafeCell::new(NothingSent),
- state: AtomicUsize::new(EMPTY),
- }
- }
-
- pub fn send(&self, t: T) -> Result<(), T> {
- unsafe {
- // Sanity check
- match *self.upgrade.get() {
- NothingSent => {}
- _ => panic!("sending on a oneshot that's already sent on "),
- }
- assert!((*self.data.get()).is_none());
- ptr::write(self.data.get(), Some(t));
- ptr::write(self.upgrade.get(), SendUsed);
-
- match self.state.swap(DATA, Ordering::SeqCst) {
- // Sent the data, no one was waiting
- EMPTY => Ok(()),
-
- // Couldn't send the data, the port hung up first. Return the data
- // back up the stack.
- DISCONNECTED => {
- self.state.swap(DISCONNECTED, Ordering::SeqCst);
- ptr::write(self.upgrade.get(), NothingSent);
- Err((&mut *self.data.get()).take().unwrap())
- }
-
- // Not possible, these are one-use channels
- DATA => unreachable!(),
-
- // There is a thread waiting on the other end. We leave the 'DATA'
- // state inside so it'll pick it up on the other end.
- ptr => {
- SignalToken::cast_from_usize(ptr).signal();
- Ok(())
- }
- }
- }
- }
-
- // Just tests whether this channel has been sent on or not, this is only
- // safe to use from the sender.
- pub fn sent(&self) -> bool {
- unsafe { !matches!(*self.upgrade.get(), NothingSent) }
- }
-
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
- // Attempt to not block the thread (it's a little expensive). If it looks
- // like we're not empty, then immediately go through to `try_recv`.
- if self.state.load(Ordering::SeqCst) == EMPTY {
- let (wait_token, signal_token) = blocking::tokens();
- let ptr = unsafe { signal_token.cast_to_usize() };
-
- // race with senders to enter the blocking state
- if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY {
- if let Some(deadline) = deadline {
- let timed_out = !wait_token.wait_max_until(deadline);
- // Try to reset the state
- if timed_out {
- self.abort_selection().map_err(Upgraded)?;
- }
- } else {
- wait_token.wait();
- debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
- }
- } else {
- // drop the signal token, since we never blocked
- drop(unsafe { SignalToken::cast_from_usize(ptr) });
- }
- }
-
- self.try_recv()
- }
-
- pub fn try_recv(&self) -> Result<T, Failure<T>> {
- unsafe {
- match self.state.load(Ordering::SeqCst) {
- EMPTY => Err(Empty),
-
- // We saw some data on the channel, but the channel can be used
- // again to send us an upgrade. As a result, we need to re-insert
- // into the channel that there's no data available (otherwise we'll
- // just see DATA next time). This is done as a cmpxchg because if
- // the state changes under our feet we'd rather just see that state
- // change.
- DATA => {
- self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
- match (&mut *self.data.get()).take() {
- Some(data) => Ok(data),
- None => unreachable!(),
- }
- }
-
- // There's no guarantee that we receive before an upgrade happens,
- // and an upgrade flags the channel as disconnected, so when we see
- // this we first need to check if there's data available and *then*
- // we go through and process the upgrade.
- DISCONNECTED => match (&mut *self.data.get()).take() {
- Some(data) => Ok(data),
- None => match ptr::replace(self.upgrade.get(), SendUsed) {
- SendUsed | NothingSent => Err(Disconnected),
- GoUp(upgrade) => Err(Upgraded(upgrade)),
- },
- },
-
- // We are the sole receiver; there cannot be a blocking
- // receiver already.
- _ => unreachable!(),
- }
- }
- }
-
- // Returns whether the upgrade was completed. If the upgrade wasn't
- // completed, then the port couldn't get sent to the other half (it will
- // never receive it).
- pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
- unsafe {
- let prev = match *self.upgrade.get() {
- NothingSent => NothingSent,
- SendUsed => SendUsed,
- _ => panic!("upgrading again"),
- };
- ptr::write(self.upgrade.get(), GoUp(up));
-
- match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
- // If the channel is empty or has data on it, then we're good to go.
- // Senders will check the data before the upgrade (in case we
- // plastered over the DATA state).
- DATA | EMPTY => UpSuccess,
-
- // If the other end is already disconnected, then we failed the
- // upgrade. Be sure to trash the port we were given.
- DISCONNECTED => {
- ptr::replace(self.upgrade.get(), prev);
- UpDisconnected
- }
-
- // If someone's waiting, we gotta wake them up
- ptr => UpWoke(SignalToken::cast_from_usize(ptr)),
- }
- }
- }
-
- pub fn drop_chan(&self) {
- match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
- DATA | DISCONNECTED | EMPTY => {}
-
- // If someone's waiting, we gotta wake them up
- ptr => unsafe {
- SignalToken::cast_from_usize(ptr).signal();
- },
- }
- }
-
- pub fn drop_port(&self) {
- match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
- // An empty channel has nothing to do, and a remotely disconnected
- // channel also has nothing to do b/c we're about to run the drop
- // glue
- DISCONNECTED | EMPTY => {}
-
- // There's data on the channel, so make sure we destroy it promptly.
- // This is why not using an arc is a little difficult (need the box
- // to stay valid while we take the data).
- DATA => unsafe {
- (&mut *self.data.get()).take().unwrap();
- },
-
- // We're the only ones that can block on this port
- _ => unreachable!(),
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // select implementation
- ////////////////////////////////////////////////////////////////////////////
-
- // Remove a previous selecting thread from this port. This ensures that the
- // blocked thread will no longer be visible to any other threads.
- //
- // The return value indicates whether there's data on this port.
- pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
- let state = match self.state.load(Ordering::SeqCst) {
- // Each of these states means that no further activity will happen
- // with regard to abortion selection
- s @ (EMPTY | DATA | DISCONNECTED) => s,
-
- // If we've got a blocked thread, then use an atomic to gain ownership
- // of it (may fail)
- ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst),
- };
-
- // Now that we've got ownership of our state, figure out what to do
- // about it.
- match state {
- EMPTY => unreachable!(),
- // our thread used for select was stolen
- DATA => Ok(true),
-
- // If the other end has hung up, then we have complete ownership
- // of the port. First, check if there was data waiting for us. This
- // is possible if the other end sent something and then hung up.
- //
- // We then need to check to see if there was an upgrade requested,
- // and if so, the upgraded port needs to have its selection aborted.
- DISCONNECTED => unsafe {
- if (*self.data.get()).is_some() {
- Ok(true)
- } else {
- match ptr::replace(self.upgrade.get(), SendUsed) {
- GoUp(port) => Err(port),
- _ => Ok(true),
- }
- }
- },
-
- // We woke ourselves up from select.
- ptr => unsafe {
- drop(SignalToken::cast_from_usize(ptr));
- Ok(false)
- },
- }
- }
-}
-
-impl<T> Drop for Packet<T> {
- fn drop(&mut self) {
- assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
- }
-}
diff --git a/src/libstd/sync/mpsc/shared.rs b/src/libstd/sync/mpsc/shared.rs
deleted file mode 100644
index 898654f21f2..00000000000
--- a/src/libstd/sync/mpsc/shared.rs
+++ /dev/null
@@ -1,489 +0,0 @@
-/// Shared channels.
-///
-/// This is the flavor of channels which are not necessarily optimized for any
-/// particular use case, but are the most general in how they are used. Shared
-/// channels are cloneable allowing for multiple senders.
-///
-/// High level implementation details can be found in the comment of the parent
-/// module. You'll also note that the implementation of the shared and stream
-/// channels are quite similar, and this is no coincidence!
-pub use self::Failure::*;
-use self::StartResult::*;
-
-use core::cmp;
-use core::intrinsics::abort;
-
-use crate::cell::UnsafeCell;
-use crate::ptr;
-use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken};
-use crate::sync::mpsc::mpsc_queue as mpsc;
-use crate::sync::{Mutex, MutexGuard};
-use crate::thread;
-use crate::time::Instant;
-
-const DISCONNECTED: isize = isize::MIN;
-const FUDGE: isize = 1024;
-const MAX_REFCOUNT: usize = (isize::MAX) as usize;
-#[cfg(test)]
-const MAX_STEALS: isize = 5;
-#[cfg(not(test))]
-const MAX_STEALS: isize = 1 << 20;
-
-pub struct Packet<T> {
- queue: mpsc::Queue<T>,
- cnt: AtomicIsize, // How many items are on this channel
- steals: UnsafeCell<isize>, // How many times has a port received without blocking?
- to_wake: AtomicUsize, // SignalToken for wake up
-
- // The number of channels which are currently using this packet.
- channels: AtomicUsize,
-
- // See the discussion in Port::drop and the channel send methods for what
- // these are used for
- port_dropped: AtomicBool,
- sender_drain: AtomicIsize,
-
- // this lock protects various portions of this implementation during
- // select()
- select_lock: Mutex<()>,
-}
-
-pub enum Failure {
- Empty,
- Disconnected,
-}
-
-#[derive(PartialEq, Eq)]
-enum StartResult {
- Installed,
- Abort,
-}
-
-impl<T> Packet<T> {
- // Creation of a packet *must* be followed by a call to postinit_lock
- // and later by inherit_blocker
- pub fn new() -> Packet<T> {
- Packet {
- queue: mpsc::Queue::new(),
- cnt: AtomicIsize::new(0),
- steals: UnsafeCell::new(0),
- to_wake: AtomicUsize::new(0),
- channels: AtomicUsize::new(2),
- port_dropped: AtomicBool::new(false),
- sender_drain: AtomicIsize::new(0),
- select_lock: Mutex::new(()),
- }
- }
-
- // This function should be used after newly created Packet
- // was wrapped with an Arc
- // In other case mutex data will be duplicated while cloning
- // and that could cause problems on platforms where it is
- // represented by opaque data structure
- pub fn postinit_lock(&self) -> MutexGuard<'_, ()> {
- self.select_lock.lock().unwrap()
- }
-
- // This function is used at the creation of a shared packet to inherit a
- // previously blocked thread. This is done to prevent spurious wakeups of
- // threads in select().
- //
- // This can only be called at channel-creation time
- pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) {
- if let Some(token) = token {
- assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
- self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst);
- self.cnt.store(-1, Ordering::SeqCst);
-
- // This store is a little sketchy. What's happening here is that
- // we're transferring a blocker from a oneshot or stream channel to
- // this shared channel. In doing so, we never spuriously wake them
- // up and rather only wake them up at the appropriate time. This
- // implementation of shared channels assumes that any blocking
- // recv() will undo the increment of steals performed in try_recv()
- // once the recv is complete. This thread that we're inheriting,
- // however, is not in the middle of recv. Hence, the first time we
- // wake them up, they're going to wake up from their old port, move
- // on to the upgraded port, and then call the block recv() function.
- //
- // When calling this function, they'll find there's data immediately
- // available, counting it as a steal. This in fact wasn't a steal
- // because we appropriately blocked them waiting for data.
- //
- // To offset this bad increment, we initially set the steal count to
- // -1. You'll find some special code in abort_selection() as well to
- // ensure that this -1 steal count doesn't escape too far.
- unsafe {
- *self.steals.get() = -1;
- }
- }
-
- // When the shared packet is constructed, we grabbed this lock. The
- // purpose of this lock is to ensure that abort_selection() doesn't
- // interfere with this method. After we unlock this lock, we're
- // signifying that we're done modifying self.cnt and self.to_wake and
- // the port is ready for the world to continue using it.
- drop(guard);
- }
-
- pub fn send(&self, t: T) -> Result<(), T> {
- // See Port::drop for what's going on
- if self.port_dropped.load(Ordering::SeqCst) {
- return Err(t);
- }
-
- // Note that the multiple sender case is a little trickier
- // semantically than the single sender case. The logic for
- // incrementing is "add and if disconnected store disconnected".
- // This could end up leading some senders to believe that there
- // wasn't a disconnect if in fact there was a disconnect. This means
- // that while one thread is attempting to re-store the disconnected
- // states, other threads could walk through merrily incrementing
- // this very-negative disconnected count. To prevent senders from
- // spuriously attempting to send when the channels is actually
- // disconnected, the count has a ranged check here.
- //
- // This is also done for another reason. Remember that the return
- // value of this function is:
- //
- // `true` == the data *may* be received, this essentially has no
- // meaning
- // `false` == the data will *never* be received, this has a lot of
- // meaning
- //
- // In the SPSC case, we have a check of 'queue.is_empty()' to see
- // whether the data was actually received, but this same condition
- // means nothing in a multi-producer context. As a result, this
- // preflight check serves as the definitive "this will never be
- // received". Once we get beyond this check, we have permanently
- // entered the realm of "this may be received"
- if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {
- return Err(t);
- }
-
- self.queue.push(t);
- match self.cnt.fetch_add(1, Ordering::SeqCst) {
- -1 => {
- self.take_to_wake().signal();
- }
-
- // In this case, we have possibly failed to send our data, and
- // we need to consider re-popping the data in order to fully
- // destroy it. We must arbitrate among the multiple senders,
- // however, because the queues that we're using are
- // single-consumer queues. In order to do this, all exiting
- // pushers will use an atomic count in order to count those
- // flowing through. Pushers who see 0 are required to drain as
- // much as possible, and then can only exit when they are the
- // only pusher (otherwise they must try again).
- n if n < DISCONNECTED + FUDGE => {
- // see the comment in 'try' for a shared channel for why this
- // window of "not disconnected" is ok.
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
-
- if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 {
- loop {
- // drain the queue, for info on the thread yield see the
- // discussion in try_recv
- loop {
- match self.queue.pop() {
- mpsc::Data(..) => {}
- mpsc::Empty => break,
- mpsc::Inconsistent => thread::yield_now(),
- }
- }
- // maybe we're done, if we're not the last ones
- // here, then we need to go try again.
- if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 {
- break;
- }
- }
-
- // At this point, there may still be data on the queue,
- // but only if the count hasn't been incremented and
- // some other sender hasn't finished pushing data just
- // yet. That sender in question will drain its own data.
- }
- }
-
- // Can't make any assumptions about this case like in the SPSC case.
- _ => {}
- }
-
- Ok(())
- }
-
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
- // This code is essentially the exact same as that found in the stream
- // case (see stream.rs)
- match self.try_recv() {
- Err(Empty) => {}
- data => return data,
- }
-
- let (wait_token, signal_token) = blocking::tokens();
- if self.decrement(signal_token) == Installed {
- if let Some(deadline) = deadline {
- let timed_out = !wait_token.wait_max_until(deadline);
- if timed_out {
- self.abort_selection(false);
- }
- } else {
- wait_token.wait();
- }
- }
-
- match self.try_recv() {
- data @ Ok(..) => unsafe {
- *self.steals.get() -= 1;
- data
- },
- data => data,
- }
- }
-
- // Essentially the exact same thing as the stream decrement function.
- // Returns true if blocking should proceed.
- fn decrement(&self, token: SignalToken) -> StartResult {
- unsafe {
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
- let ptr = token.cast_to_usize();
- self.to_wake.store(ptr, Ordering::SeqCst);
-
- let steals = ptr::replace(self.steals.get(), 0);
-
- match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
- DISCONNECTED => {
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
- }
- // If we factor in our steals and notice that the channel has no
- // data, we successfully sleep
- n => {
- assert!(n >= 0);
- if n - steals <= 0 {
- return Installed;
- }
- }
- }
-
- self.to_wake.store(0, Ordering::SeqCst);
- drop(SignalToken::cast_from_usize(ptr));
- Abort
- }
- }
-
- pub fn try_recv(&self) -> Result<T, Failure> {
- let ret = match self.queue.pop() {
- mpsc::Data(t) => Some(t),
- mpsc::Empty => None,
-
- // This is a bit of an interesting case. The channel is reported as
- // having data available, but our pop() has failed due to the queue
- // being in an inconsistent state. This means that there is some
- // pusher somewhere which has yet to complete, but we are guaranteed
- // that a pop will eventually succeed. In this case, we spin in a
- // yield loop because the remote sender should finish their enqueue
- // operation "very quickly".
- //
- // Avoiding this yield loop would require a different queue
- // abstraction which provides the guarantee that after M pushes have
- // succeeded, at least M pops will succeed. The current queues
- // guarantee that if there are N active pushes, you can pop N times
- // once all N have finished.
- mpsc::Inconsistent => {
- let data;
- loop {
- thread::yield_now();
- match self.queue.pop() {
- mpsc::Data(t) => {
- data = t;
- break;
- }
- mpsc::Empty => panic!("inconsistent => empty"),
- mpsc::Inconsistent => {}
- }
- }
- Some(data)
- }
- };
- match ret {
- // See the discussion in the stream implementation for why we
- // might decrement steals.
- Some(data) => unsafe {
- if *self.steals.get() > MAX_STEALS {
- match self.cnt.swap(0, Ordering::SeqCst) {
- DISCONNECTED => {
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
- }
- n => {
- let m = cmp::min(n, *self.steals.get());
- *self.steals.get() -= m;
- self.bump(n - m);
- }
- }
- assert!(*self.steals.get() >= 0);
- }
- *self.steals.get() += 1;
- Ok(data)
- },
-
- // See the discussion in the stream implementation for why we try
- // again.
- None => {
- match self.cnt.load(Ordering::SeqCst) {
- n if n != DISCONNECTED => Err(Empty),
- _ => {
- match self.queue.pop() {
- mpsc::Data(t) => Ok(t),
- mpsc::Empty => Err(Disconnected),
- // with no senders, an inconsistency is impossible.
- mpsc::Inconsistent => unreachable!(),
- }
- }
- }
- }
- }
- }
-
- // Prepares this shared packet for a channel clone, essentially just bumping
- // a refcount.
- pub fn clone_chan(&self) {
- let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
-
- // See comments on Arc::clone() on why we do this (for `mem::forget`).
- if old_count > MAX_REFCOUNT {
- abort();
- }
- }
-
- // Decrement the reference count on a channel. This is called whenever a
- // Chan is dropped and may end up waking up a receiver. It's the receiver's
- // responsibility on the other end to figure out that we've disconnected.
- pub fn drop_chan(&self) {
- match self.channels.fetch_sub(1, Ordering::SeqCst) {
- 1 => {}
- n if n > 1 => return,
- n => panic!("bad number of channels left {}", n),
- }
-
- match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
- -1 => {
- self.take_to_wake().signal();
- }
- DISCONNECTED => {}
- n => {
- assert!(n >= 0);
- }
- }
- }
-
- // See the long discussion inside of stream.rs for why the queue is drained,
- // and why it is done in this fashion.
- pub fn drop_port(&self) {
- self.port_dropped.store(true, Ordering::SeqCst);
- let mut steals = unsafe { *self.steals.get() };
- while {
- let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, Ordering::SeqCst);
- cnt != DISCONNECTED && cnt != steals
- } {
- // See the discussion in 'try_recv' for why we yield
- // control of this thread.
- loop {
- match self.queue.pop() {
- mpsc::Data(..) => {
- steals += 1;
- }
- mpsc::Empty | mpsc::Inconsistent => break,
- }
- }
- }
- }
-
- // Consumes ownership of the 'to_wake' field.
- fn take_to_wake(&self) -> SignalToken {
- let ptr = self.to_wake.load(Ordering::SeqCst);
- self.to_wake.store(0, Ordering::SeqCst);
- assert!(ptr != 0);
- unsafe { SignalToken::cast_from_usize(ptr) }
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // select implementation
- ////////////////////////////////////////////////////////////////////////////
-
- // increment the count on the channel (used for selection)
- fn bump(&self, amt: isize) -> isize {
- match self.cnt.fetch_add(amt, Ordering::SeqCst) {
- DISCONNECTED => {
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
- DISCONNECTED
- }
- n => n,
- }
- }
-
- // Cancels a previous thread waiting on this port, returning whether there's
- // data on the port.
- //
- // This is similar to the stream implementation (hence fewer comments), but
- // uses a different value for the "steals" variable.
- pub fn abort_selection(&self, _was_upgrade: bool) -> bool {
- // Before we do anything else, we bounce on this lock. The reason for
- // doing this is to ensure that any upgrade-in-progress is gone and
- // done with. Without this bounce, we can race with inherit_blocker
- // about looking at and dealing with to_wake. Once we have acquired the
- // lock, we are guaranteed that inherit_blocker is done.
- {
- let _guard = self.select_lock.lock().unwrap();
- }
-
- // Like the stream implementation, we want to make sure that the count
- // on the channel goes non-negative. We don't know how negative the
- // stream currently is, so instead of using a steal value of 1, we load
- // the channel count and figure out what we should do to make it
- // positive.
- let steals = {
- let cnt = self.cnt.load(Ordering::SeqCst);
- if cnt < 0 && cnt != DISCONNECTED { -cnt } else { 0 }
- };
- let prev = self.bump(steals + 1);
-
- if prev == DISCONNECTED {
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
- true
- } else {
- let cur = prev + steals + 1;
- assert!(cur >= 0);
- if prev < 0 {
- drop(self.take_to_wake());
- } else {
- while self.to_wake.load(Ordering::SeqCst) != 0 {
- thread::yield_now();
- }
- }
- unsafe {
- // if the number of steals is -1, it was the pre-emptive -1 steal
- // count from when we inherited a blocker. This is fine because
- // we're just going to overwrite it with a real value.
- let old = self.steals.get();
- assert!(*old == 0 || *old == -1);
- *old = steals;
- prev >= 0
- }
- }
- }
-}
-
-impl<T> Drop for Packet<T> {
- fn drop(&mut self) {
- // Note that this load is not only an assert for correctness about
- // disconnection, but also a proper fence before the read of
- // `to_wake`, so this assert cannot be removed with also removing
- // the `to_wake` assert.
- assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
- assert_eq!(self.channels.load(Ordering::SeqCst), 0);
- }
-}
diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs
deleted file mode 100644
index 0274268f69f..00000000000
--- a/src/libstd/sync/mpsc/spsc_queue.rs
+++ /dev/null
@@ -1,338 +0,0 @@
-//! A single-producer single-consumer concurrent queue
-//!
-//! This module contains the implementation of an SPSC queue which can be used
-//! concurrently between two threads. This data structure is safe to use and
-//! enforces the semantics that there is one pusher and one popper.
-
-// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
-
-use core::cell::UnsafeCell;
-use core::ptr;
-
-use crate::boxed::Box;
-use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
-
-use super::cache_aligned::CacheAligned;
-
-// Node within the linked list queue of messages to send
-struct Node<T> {
- // FIXME: this could be an uninitialized T if we're careful enough, and
- // that would reduce memory usage (and be a bit faster).
- // is it worth it?
- value: Option<T>, // nullable for re-use of nodes
- cached: bool, // This node goes into the node cache
- next: AtomicPtr<Node<T>>, // next node in the queue
-}
-
-/// The single-producer single-consumer queue. This structure is not cloneable,
-/// but it can be safely shared in an Arc if it is guaranteed that there
-/// is only one popper and one pusher touching the queue at any one point in
-/// time.
-pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> {
- // consumer fields
- consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
-
- // producer fields
- producer: CacheAligned<Producer<T, ProducerAddition>>,
-}
-
-struct Consumer<T, Addition> {
- tail: UnsafeCell<*mut Node<T>>, // where to pop from
- tail_prev: AtomicPtr<Node<T>>, // where to pop from
- cache_bound: usize, // maximum cache size
- cached_nodes: AtomicUsize, // number of nodes marked as cacheable
- addition: Addition,
-}
-
-struct Producer<T, Addition> {
- head: UnsafeCell<*mut Node<T>>, // where to push to
- first: UnsafeCell<*mut Node<T>>, // where to get new nodes from
- tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
- addition: Addition,
-}
-
-unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {}
-
-unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {}
-
-impl<T> Node<T> {
- fn new() -> *mut Node<T> {
- Box::into_raw(box Node {
- value: None,
- cached: false,
- next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
- })
- }
-}
-
-impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
- /// Creates a new queue. With given additional elements in the producer and
- /// consumer portions of the queue.
- ///
- /// Due to the performance implications of cache-contention,
- /// we wish to keep fields used mainly by the producer on a separate cache
- /// line than those used by the consumer.
- /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
- /// allocate one for small fields, so we allow users to insert additional
- /// fields into the cache lines already allocated by this for the producer
- /// and consumer.
- ///
- /// This is unsafe as the type system doesn't enforce a single
- /// consumer-producer relationship. It also allows the consumer to `pop`
- /// items while there is a `peek` active due to all methods having a
- /// non-mutable receiver.
- ///
- /// # Arguments
- ///
- /// * `bound` - This queue implementation is implemented with a linked
- /// list, and this means that a push is always a malloc. In
- /// order to amortize this cost, an internal cache of nodes is
- /// maintained to prevent a malloc from always being
- /// necessary. This bound is the limit on the size of the
- /// cache (if desired). If the value is 0, then the cache has
- /// no bound. Otherwise, the cache will never grow larger than
- /// `bound` (although the queue itself could be much larger.
- pub unsafe fn with_additions(
- bound: usize,
- producer_addition: ProducerAddition,
- consumer_addition: ConsumerAddition,
- ) -> Self {
- let n1 = Node::new();
- let n2 = Node::new();
- (*n1).next.store(n2, Ordering::Relaxed);
- Queue {
- consumer: CacheAligned::new(Consumer {
- tail: UnsafeCell::new(n2),
- tail_prev: AtomicPtr::new(n1),
- cache_bound: bound,
- cached_nodes: AtomicUsize::new(0),
- addition: consumer_addition,
- }),
- producer: CacheAligned::new(Producer {
- head: UnsafeCell::new(n2),
- first: UnsafeCell::new(n1),
- tail_copy: UnsafeCell::new(n1),
- addition: producer_addition,
- }),
- }
- }
-
- /// Pushes a new value onto this queue. Note that to use this function
- /// safely, it must be externally guaranteed that there is only one pusher.
- pub fn push(&self, t: T) {
- unsafe {
- // Acquire a node (which either uses a cached one or allocates a new
- // one), and then append this to the 'head' node.
- let n = self.alloc();
- assert!((*n).value.is_none());
- (*n).value = Some(t);
- (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
- (**self.producer.head.get()).next.store(n, Ordering::Release);
- *(&self.producer.head).get() = n;
- }
- }
-
- unsafe fn alloc(&self) -> *mut Node<T> {
- // First try to see if we can consume the 'first' node for our uses.
- if *self.producer.first.get() != *self.producer.tail_copy.get() {
- let ret = *self.producer.first.get();
- *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
- return ret;
- }
- // If the above fails, then update our copy of the tail and try
- // again.
- *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire);
- if *self.producer.first.get() != *self.producer.tail_copy.get() {
- let ret = *self.producer.first.get();
- *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
- return ret;
- }
- // If all of that fails, then we have to allocate a new node
- // (there's nothing in the node cache).
- Node::new()
- }
-
- /// Attempts to pop a value from this queue. Remember that to use this type
- /// safely you must ensure that there is only one popper at a time.
- pub fn pop(&self) -> Option<T> {
- unsafe {
- // The `tail` node is not actually a used node, but rather a
- // sentinel from where we should start popping from. Hence, look at
- // tail's next field and see if we can use it. If we do a pop, then
- // the current tail node is a candidate for going into the cache.
- let tail = *self.consumer.tail.get();
- let next = (*tail).next.load(Ordering::Acquire);
- if next.is_null() {
- return None;
- }
- assert!((*next).value.is_some());
- let ret = (*next).value.take();
-
- *self.consumer.0.tail.get() = next;
- if self.consumer.cache_bound == 0 {
- self.consumer.tail_prev.store(tail, Ordering::Release);
- } else {
- let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
- if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
- self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
- (*tail).cached = true;
- }
-
- if (*tail).cached {
- self.consumer.tail_prev.store(tail, Ordering::Release);
- } else {
- (*self.consumer.tail_prev.load(Ordering::Relaxed))
- .next
- .store(next, Ordering::Relaxed);
- // We have successfully erased all references to 'tail', so
- // now we can safely drop it.
- let _: Box<Node<T>> = Box::from_raw(tail);
- }
- }
- ret
- }
- }
-
- /// Attempts to peek at the head of the queue, returning `None` if the queue
- /// has no data currently
- ///
- /// # Warning
- /// The reference returned is invalid if it is not used before the consumer
- /// pops the value off the queue. If the producer then pushes another value
- /// onto the queue, it will overwrite the value pointed to by the reference.
- pub fn peek(&self) -> Option<&mut T> {
- // This is essentially the same as above with all the popping bits
- // stripped out.
- unsafe {
- let tail = *self.consumer.tail.get();
- let next = (*tail).next.load(Ordering::Acquire);
- if next.is_null() { None } else { (*next).value.as_mut() }
- }
- }
-
- pub fn producer_addition(&self) -> &ProducerAddition {
- &self.producer.addition
- }
-
- pub fn consumer_addition(&self) -> &ConsumerAddition {
- &self.consumer.addition
- }
-}
-
-impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
- fn drop(&mut self) {
- unsafe {
- let mut cur = *self.producer.first.get();
- while !cur.is_null() {
- let next = (*cur).next.load(Ordering::Relaxed);
- let _n: Box<Node<T>> = Box::from_raw(cur);
- cur = next;
- }
- }
- }
-}
-
-#[cfg(all(test, not(target_os = "emscripten")))]
-mod tests {
- use super::Queue;
- use crate::sync::mpsc::channel;
- use crate::sync::Arc;
- use crate::thread;
-
- #[test]
- fn smoke() {
- unsafe {
- let queue = Queue::with_additions(0, (), ());
- queue.push(1);
- queue.push(2);
- assert_eq!(queue.pop(), Some(1));
- assert_eq!(queue.pop(), Some(2));
- assert_eq!(queue.pop(), None);
- queue.push(3);
- queue.push(4);
- assert_eq!(queue.pop(), Some(3));
- assert_eq!(queue.pop(), Some(4));
- assert_eq!(queue.pop(), None);
- }
- }
-
- #[test]
- fn peek() {
- unsafe {
- let queue = Queue::with_additions(0, (), ());
- queue.push(vec![1]);
-
- // Ensure the borrowchecker works
- match queue.peek() {
- Some(vec) => {
- assert_eq!(&*vec, &[1]);
- }
- None => unreachable!(),
- }
-
- match queue.pop() {
- Some(vec) => {
- assert_eq!(&*vec, &[1]);
- }
- None => unreachable!(),
- }
- }
- }
-
- #[test]
- fn drop_full() {
- unsafe {
- let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
- q.push(box 1);
- q.push(box 2);
- }
- }
-
- #[test]
- fn smoke_bound() {
- unsafe {
- let q = Queue::with_additions(0, (), ());
- q.push(1);
- q.push(2);
- assert_eq!(q.pop(), Some(1));
- assert_eq!(q.pop(), Some(2));
- assert_eq!(q.pop(), None);
- q.push(3);
- q.push(4);
- assert_eq!(q.pop(), Some(3));
- assert_eq!(q.pop(), Some(4));
- assert_eq!(q.pop(), None);
- }
- }
-
- #[test]
- fn stress() {
- unsafe {
- stress_bound(0);
- stress_bound(1);
- }
-
- unsafe fn stress_bound(bound: usize) {
- let q = Arc::new(Queue::with_additions(bound, (), ()));
-
- let (tx, rx) = channel();
- let q2 = q.clone();
- let _t = thread::spawn(move || {
- for _ in 0..100000 {
- loop {
- match q2.pop() {
- Some(1) => break,
- Some(_) => panic!(),
- None => {}
- }
- }
- }
- tx.send(()).unwrap();
- });
- for _ in 0..100000 {
- q.push(1);
- }
- rx.recv().unwrap();
- }
- }
-}
diff --git a/src/libstd/sync/mpsc/stream.rs b/src/libstd/sync/mpsc/stream.rs
deleted file mode 100644
index 9f7c1af8951..00000000000
--- a/src/libstd/sync/mpsc/stream.rs
+++ /dev/null
@@ -1,453 +0,0 @@
-/// Stream channels
-///
-/// This is the flavor of channels which are optimized for one sender and one
-/// receiver. The sender will be upgraded to a shared channel if the channel is
-/// cloned.
-///
-/// High level implementation details can be found in the comment of the parent
-/// module.
-pub use self::Failure::*;
-use self::Message::*;
-pub use self::UpgradeResult::*;
-
-use core::cmp;
-
-use crate::cell::UnsafeCell;
-use crate::ptr;
-use crate::thread;
-use crate::time::Instant;
-
-use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken};
-use crate::sync::mpsc::spsc_queue as spsc;
-use crate::sync::mpsc::Receiver;
-
-const DISCONNECTED: isize = isize::MIN;
-#[cfg(test)]
-const MAX_STEALS: isize = 5;
-#[cfg(not(test))]
-const MAX_STEALS: isize = 1 << 20;
-
-pub struct Packet<T> {
- // internal queue for all messages
- queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
-}
-
-struct ProducerAddition {
- cnt: AtomicIsize, // How many items are on this channel
- to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
-
- port_dropped: AtomicBool, // flag if the channel has been destroyed.
-}
-
-struct ConsumerAddition {
- steals: UnsafeCell<isize>, // How many times has a port received without blocking?
-}
-
-pub enum Failure<T> {
- Empty,
- Disconnected,
- Upgraded(Receiver<T>),
-}
-
-pub enum UpgradeResult {
- UpSuccess,
- UpDisconnected,
- UpWoke(SignalToken),
-}
-
-// Any message could contain an "upgrade request" to a new shared port, so the
-// internal queue it's a queue of T, but rather Message<T>
-enum Message<T> {
- Data(T),
- GoUp(Receiver<T>),
-}
-
-impl<T> Packet<T> {
- pub fn new() -> Packet<T> {
- Packet {
- queue: unsafe {
- spsc::Queue::with_additions(
- 128,
- ProducerAddition {
- cnt: AtomicIsize::new(0),
- to_wake: AtomicUsize::new(0),
-
- port_dropped: AtomicBool::new(false),
- },
- ConsumerAddition { steals: UnsafeCell::new(0) },
- )
- },
- }
- }
-
- pub fn send(&self, t: T) -> Result<(), T> {
- // If the other port has deterministically gone away, then definitely
- // must return the data back up the stack. Otherwise, the data is
- // considered as being sent.
- if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
- return Err(t);
- }
-
- match self.do_send(Data(t)) {
- UpSuccess | UpDisconnected => {}
- UpWoke(token) => {
- token.signal();
- }
- }
- Ok(())
- }
-
- pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
- // If the port has gone away, then there's no need to proceed any
- // further.
- if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
- return UpDisconnected;
- }
-
- self.do_send(GoUp(up))
- }
-
- fn do_send(&self, t: Message<T>) -> UpgradeResult {
- self.queue.push(t);
- match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) {
- // As described in the mod's doc comment, -1 == wakeup
- -1 => UpWoke(self.take_to_wake()),
- // As as described before, SPSC queues must be >= -2
- -2 => UpSuccess,
-
- // Be sure to preserve the disconnected state, and the return value
- // in this case is going to be whether our data was received or not.
- // This manifests itself on whether we have an empty queue or not.
- //
- // Primarily, are required to drain the queue here because the port
- // will never remove this data. We can only have at most one item to
- // drain (the port drains the rest).
- DISCONNECTED => {
- self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
- let first = self.queue.pop();
- let second = self.queue.pop();
- assert!(second.is_none());
-
- match first {
- Some(..) => UpSuccess, // we failed to send the data
- None => UpDisconnected, // we successfully sent data
- }
- }
-
- // Otherwise we just sent some data on a non-waiting queue, so just
- // make sure the world is sane and carry on!
- n => {
- assert!(n >= 0);
- UpSuccess
- }
- }
- }
-
- // Consumes ownership of the 'to_wake' field.
- fn take_to_wake(&self) -> SignalToken {
- let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
- self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
- assert!(ptr != 0);
- unsafe { SignalToken::cast_from_usize(ptr) }
- }
-
- // Decrements the count on the channel for a sleeper, returning the sleeper
- // back if it shouldn't sleep. Note that this is the location where we take
- // steals into account.
- fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
- assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
- let ptr = unsafe { token.cast_to_usize() };
- self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
-
- let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
-
- match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
- DISCONNECTED => {
- self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
- }
- // If we factor in our steals and notice that the channel has no
- // data, we successfully sleep
- n => {
- assert!(n >= 0);
- if n - steals <= 0 {
- return Ok(());
- }
- }
- }
-
- self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
- Err(unsafe { SignalToken::cast_from_usize(ptr) })
- }
-
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
- // Optimistic preflight check (scheduling is expensive).
- match self.try_recv() {
- Err(Empty) => {}
- data => return data,
- }
-
- // Welp, our channel has no data. Deschedule the current thread and
- // initiate the blocking protocol.
- let (wait_token, signal_token) = blocking::tokens();
- if self.decrement(signal_token).is_ok() {
- if let Some(deadline) = deadline {
- let timed_out = !wait_token.wait_max_until(deadline);
- if timed_out {
- self.abort_selection(/* was_upgrade = */ false).map_err(Upgraded)?;
- }
- } else {
- wait_token.wait();
- }
- }
-
- match self.try_recv() {
- // Messages which actually popped from the queue shouldn't count as
- // a steal, so offset the decrement here (we already have our
- // "steal" factored into the channel count above).
- data @ (Ok(..) | Err(Upgraded(..))) => unsafe {
- *self.queue.consumer_addition().steals.get() -= 1;
- data
- },
-
- data => data,
- }
- }
-
- pub fn try_recv(&self) -> Result<T, Failure<T>> {
- match self.queue.pop() {
- // If we stole some data, record to that effect (this will be
- // factored into cnt later on).
- //
- // Note that we don't allow steals to grow without bound in order to
- // prevent eventual overflow of either steals or cnt as an overflow
- // would have catastrophic results. Sometimes, steals > cnt, but
- // other times cnt > steals, so we don't know the relation between
- // steals and cnt. This code path is executed only rarely, so we do
- // a pretty slow operation, of swapping 0 into cnt, taking steals
- // down as much as possible (without going negative), and then
- // adding back in whatever we couldn't factor into steals.
- Some(data) => unsafe {
- if *self.queue.consumer_addition().steals.get() > MAX_STEALS {
- match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) {
- DISCONNECTED => {
- self.queue
- .producer_addition()
- .cnt
- .store(DISCONNECTED, Ordering::SeqCst);
- }
- n => {
- let m = cmp::min(n, *self.queue.consumer_addition().steals.get());
- *self.queue.consumer_addition().steals.get() -= m;
- self.bump(n - m);
- }
- }
- assert!(*self.queue.consumer_addition().steals.get() >= 0);
- }
- *self.queue.consumer_addition().steals.get() += 1;
- match data {
- Data(t) => Ok(t),
- GoUp(up) => Err(Upgraded(up)),
- }
- },
-
- None => {
- match self.queue.producer_addition().cnt.load(Ordering::SeqCst) {
- n if n != DISCONNECTED => Err(Empty),
-
- // This is a little bit of a tricky case. We failed to pop
- // data above, and then we have viewed that the channel is
- // disconnected. In this window more data could have been
- // sent on the channel. It doesn't really make sense to
- // return that the channel is disconnected when there's
- // actually data on it, so be extra sure there's no data by
- // popping one more time.
- //
- // We can ignore steals because the other end is
- // disconnected and we'll never need to really factor in our
- // steals again.
- _ => match self.queue.pop() {
- Some(Data(t)) => Ok(t),
- Some(GoUp(up)) => Err(Upgraded(up)),
- None => Err(Disconnected),
- },
- }
- }
- }
- }
-
- pub fn drop_chan(&self) {
- // Dropping a channel is pretty simple, we just flag it as disconnected
- // and then wakeup a blocker if there is one.
- match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) {
- -1 => {
- self.take_to_wake().signal();
- }
- DISCONNECTED => {}
- n => {
- assert!(n >= 0);
- }
- }
- }
-
- pub fn drop_port(&self) {
- // Dropping a port seems like a fairly trivial thing. In theory all we
- // need to do is flag that we're disconnected and then everything else
- // can take over (we don't have anyone to wake up).
- //
- // The catch for Ports is that we want to drop the entire contents of
- // the queue. There are multiple reasons for having this property, the
- // largest of which is that if another chan is waiting in this channel
- // (but not received yet), then waiting on that port will cause a
- // deadlock.
- //
- // So if we accept that we must now destroy the entire contents of the
- // queue, this code may make a bit more sense. The tricky part is that
- // we can't let any in-flight sends go un-dropped, we have to make sure
- // *everything* is dropped and nothing new will come onto the channel.
-
- // The first thing we do is set a flag saying that we're done for. All
- // sends are gated on this flag, so we're immediately guaranteed that
- // there are a bounded number of active sends that we'll have to deal
- // with.
- self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst);
-
- // Now that we're guaranteed to deal with a bounded number of senders,
- // we need to drain the queue. This draining process happens atomically
- // with respect to the "count" of the channel. If the count is nonzero
- // (with steals taken into account), then there must be data on the
- // channel. In this case we drain everything and then try again. We will
- // continue to fail while active senders send data while we're dropping
- // data, but eventually we're guaranteed to break out of this loop
- // (because there is a bounded number of senders).
- let mut steals = unsafe { *self.queue.consumer_addition().steals.get() };
- while {
- let cnt = self.queue.producer_addition().cnt.compare_and_swap(
- steals,
- DISCONNECTED,
- Ordering::SeqCst,
- );
- cnt != DISCONNECTED && cnt != steals
- } {
- while self.queue.pop().is_some() {
- steals += 1;
- }
- }
-
- // At this point in time, we have gated all future senders from sending,
- // and we have flagged the channel as being disconnected. The senders
- // still have some responsibility, however, because some sends may not
- // complete until after we flag the disconnection. There are more
- // details in the sending methods that see DISCONNECTED
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // select implementation
- ////////////////////////////////////////////////////////////////////////////
-
- // increment the count on the channel (used for selection)
- fn bump(&self, amt: isize) -> isize {
- match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
- DISCONNECTED => {
- self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
- DISCONNECTED
- }
- n => n,
- }
- }
-
- // Removes a previous thread from being blocked in this port
- pub fn abort_selection(&self, was_upgrade: bool) -> Result<bool, Receiver<T>> {
- // If we're aborting selection after upgrading from a oneshot, then
- // we're guarantee that no one is waiting. The only way that we could
- // have seen the upgrade is if data was actually sent on the channel
- // half again. For us, this means that there is guaranteed to be data on
- // this channel. Furthermore, we're guaranteed that there was no
- // start_selection previously, so there's no need to modify `self.cnt`
- // at all.
- //
- // Hence, because of these invariants, we immediately return `Ok(true)`.
- // Note that the data may not actually be sent on the channel just yet.
- // The other end could have flagged the upgrade but not sent data to
- // this end. This is fine because we know it's a small bounded windows
- // of time until the data is actually sent.
- if was_upgrade {
- assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
- assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
- return Ok(true);
- }
-
- // We want to make sure that the count on the channel goes non-negative,
- // and in the stream case we can have at most one steal, so just assume
- // that we had one steal.
- let steals = 1;
- let prev = self.bump(steals + 1);
-
- // If we were previously disconnected, then we know for sure that there
- // is no thread in to_wake, so just keep going
- let has_data = if prev == DISCONNECTED {
- assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
- true // there is data, that data is that we're disconnected
- } else {
- let cur = prev + steals + 1;
- assert!(cur >= 0);
-
- // If the previous count was negative, then we just made things go
- // positive, hence we passed the -1 boundary and we're responsible
- // for removing the to_wake() field and trashing it.
- //
- // If the previous count was positive then we're in a tougher
- // situation. A possible race is that a sender just incremented
- // through -1 (meaning it's going to try to wake a thread up), but it
- // hasn't yet read the to_wake. In order to prevent a future recv()
- // from waking up too early (this sender picking up the plastered
- // over to_wake), we spin loop here waiting for to_wake to be 0.
- // Note that this entire select() implementation needs an overhaul,
- // and this is *not* the worst part of it, so this is not done as a
- // final solution but rather out of necessity for now to get
- // something working.
- if prev < 0 {
- drop(self.take_to_wake());
- } else {
- while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
- thread::yield_now();
- }
- }
- unsafe {
- assert_eq!(*self.queue.consumer_addition().steals.get(), 0);
- *self.queue.consumer_addition().steals.get() = steals;
- }
-
- // if we were previously positive, then there's surely data to
- // receive
- prev >= 0
- };
-
- // Now that we've determined that this queue "has data", we peek at the
- // queue to see if the data is an upgrade or not. If it's an upgrade,
- // then we need to destroy this port and abort selection on the
- // upgraded port.
- if has_data {
- match self.queue.peek() {
- Some(&mut GoUp(..)) => match self.queue.pop() {
- Some(GoUp(port)) => Err(port),
- _ => unreachable!(),
- },
- _ => Ok(true),
- }
- } else {
- Ok(false)
- }
- }
-}
-
-impl<T> Drop for Packet<T> {
- fn drop(&mut self) {
- // Note that this load is not only an assert for correctness about
- // disconnection, but also a proper fence before the read of
- // `to_wake`, so this assert cannot be removed with also removing
- // the `to_wake` assert.
- assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
- assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
- }
-}
diff --git a/src/libstd/sync/mpsc/sync.rs b/src/libstd/sync/mpsc/sync.rs
deleted file mode 100644
index 733761671a0..00000000000
--- a/src/libstd/sync/mpsc/sync.rs
+++ /dev/null
@@ -1,495 +0,0 @@
-use self::Blocker::*;
-/// Synchronous channels/ports
-///
-/// This channel implementation differs significantly from the asynchronous
-/// implementations found next to it (oneshot/stream/share). This is an
-/// implementation of a synchronous, bounded buffer channel.
-///
-/// Each channel is created with some amount of backing buffer, and sends will
-/// *block* until buffer space becomes available. A buffer size of 0 is valid,
-/// which means that every successful send is paired with a successful recv.
-///
-/// This flavor of channels defines a new `send_opt` method for channels which
-/// is the method by which a message is sent but the thread does not panic if it
-/// cannot be delivered.
-///
-/// Another major difference is that send() will *always* return back the data
-/// if it couldn't be sent. This is because it is deterministically known when
-/// the data is received and when it is not received.
-///
-/// Implementation-wise, it can all be summed up with "use a mutex plus some
-/// logic". The mutex used here is an OS native mutex, meaning that no user code
-/// is run inside of the mutex (to prevent context switching). This
-/// implementation shares almost all code for the buffered and unbuffered cases
-/// of a synchronous channel. There are a few branches for the unbuffered case,
-/// but they're mostly just relevant to blocking senders.
-pub use self::Failure::*;
-
-use core::intrinsics::abort;
-use core::mem;
-use core::ptr;
-
-use crate::sync::atomic::{AtomicUsize, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken};
-use crate::sync::{Mutex, MutexGuard};
-use crate::time::Instant;
-
-const MAX_REFCOUNT: usize = (isize::MAX) as usize;
-
-pub struct Packet<T> {
- /// Only field outside of the mutex. Just done for kicks, but mainly because
- /// the other shared channel already had the code implemented
- channels: AtomicUsize,
-
- lock: Mutex<State<T>>,
-}
-
-unsafe impl<T: Send> Send for Packet<T> {}
-
-unsafe impl<T: Send> Sync for Packet<T> {}
-
-struct State<T> {
- disconnected: bool, // Is the channel disconnected yet?
- queue: Queue, // queue of senders waiting to send data
- blocker: Blocker, // currently blocked thread on this channel
- buf: Buffer<T>, // storage for buffered messages
- cap: usize, // capacity of this channel
-
- /// A curious flag used to indicate whether a sender failed or succeeded in
- /// blocking. This is used to transmit information back to the thread that it
- /// must dequeue its message from the buffer because it was not received.
- /// This is only relevant in the 0-buffer case. This obviously cannot be
- /// safely constructed, but it's guaranteed to always have a valid pointer
- /// value.
- canceled: Option<&'static mut bool>,
-}
-
-unsafe impl<T: Send> Send for State<T> {}
-
-/// Possible flavors of threads who can be blocked on this channel.
-enum Blocker {
- BlockedSender(SignalToken),
- BlockedReceiver(SignalToken),
- NoneBlocked,
-}
-
-/// Simple queue for threading threads together. Nodes are stack-allocated, so
-/// this structure is not safe at all
-struct Queue {
- head: *mut Node,
- tail: *mut Node,
-}
-
-struct Node {
- token: Option<SignalToken>,
- next: *mut Node,
-}
-
-unsafe impl Send for Node {}
-
-/// A simple ring-buffer
-struct Buffer<T> {
- buf: Vec<Option<T>>,
- start: usize,
- size: usize,
-}
-
-#[derive(Debug)]
-pub enum Failure {
- Empty,
- Disconnected,
-}
-
-/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
-/// in the meantime. This re-locks the mutex upon returning.
-fn wait<'a, 'b, T>(
- lock: &'a Mutex<State<T>>,
- mut guard: MutexGuard<'b, State<T>>,
- f: fn(SignalToken) -> Blocker,
-) -> MutexGuard<'a, State<T>> {
- let (wait_token, signal_token) = blocking::tokens();
- match mem::replace(&mut guard.blocker, f(signal_token)) {
- NoneBlocked => {}
- _ => unreachable!(),
- }
- drop(guard); // unlock
- wait_token.wait(); // block
- lock.lock().unwrap() // relock
-}
-
-/// Same as wait, but waiting at most until `deadline`.
-fn wait_timeout_receiver<'a, 'b, T>(
- lock: &'a Mutex<State<T>>,
- deadline: Instant,
- mut guard: MutexGuard<'b, State<T>>,
- success: &mut bool,
-) -> MutexGuard<'a, State<T>> {
- let (wait_token, signal_token) = blocking::tokens();
- match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) {
- NoneBlocked => {}
- _ => unreachable!(),
- }
- drop(guard); // unlock
- *success = wait_token.wait_max_until(deadline); // block
- let mut new_guard = lock.lock().unwrap(); // relock
- if !*success {
- abort_selection(&mut new_guard);
- }
- new_guard
-}
-
-fn abort_selection<T>(guard: &mut MutexGuard<'_, State<T>>) -> bool {
- match mem::replace(&mut guard.blocker, NoneBlocked) {
- NoneBlocked => true,
- BlockedSender(token) => {
- guard.blocker = BlockedSender(token);
- true
- }
- BlockedReceiver(token) => {
- drop(token);
- false
- }
- }
-}
-
-/// Wakes up a thread, dropping the lock at the correct time
-fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) {
- // We need to be careful to wake up the waiting thread *outside* of the mutex
- // in case it incurs a context switch.
- drop(guard);
- token.signal();
-}
-
-impl<T> Packet<T> {
- pub fn new(capacity: usize) -> Packet<T> {
- Packet {
- channels: AtomicUsize::new(1),
- lock: Mutex::new(State {
- disconnected: false,
- blocker: NoneBlocked,
- cap: capacity,
- canceled: None,
- queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() },
- buf: Buffer {
- buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(),
- start: 0,
- size: 0,
- },
- }),
- }
- }
-
- // wait until a send slot is available, returning locked access to
- // the channel state.
- fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> {
- let mut node = Node { token: None, next: ptr::null_mut() };
- loop {
- let mut guard = self.lock.lock().unwrap();
- // are we ready to go?
- if guard.disconnected || guard.buf.size() < guard.buf.capacity() {
- return guard;
- }
- // no room; actually block
- let wait_token = guard.queue.enqueue(&mut node);
- drop(guard);
- wait_token.wait();
- }
- }
-
- pub fn send(&self, t: T) -> Result<(), T> {
- let mut guard = self.acquire_send_slot();
- if guard.disconnected {
- return Err(t);
- }
- guard.buf.enqueue(t);
-
- match mem::replace(&mut guard.blocker, NoneBlocked) {
- // if our capacity is 0, then we need to wait for a receiver to be
- // available to take our data. After waiting, we check again to make
- // sure the port didn't go away in the meantime. If it did, we need
- // to hand back our data.
- NoneBlocked if guard.cap == 0 => {
- let mut canceled = false;
- assert!(guard.canceled.is_none());
- guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
- let mut guard = wait(&self.lock, guard, BlockedSender);
- if canceled { Err(guard.buf.dequeue()) } else { Ok(()) }
- }
-
- // success, we buffered some data
- NoneBlocked => Ok(()),
-
- // success, someone's about to receive our buffered data.
- BlockedReceiver(token) => {
- wakeup(token, guard);
- Ok(())
- }
-
- BlockedSender(..) => panic!("lolwut"),
- }
- }
-
- pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
- let mut guard = self.lock.lock().unwrap();
- if guard.disconnected {
- Err(super::TrySendError::Disconnected(t))
- } else if guard.buf.size() == guard.buf.capacity() {
- Err(super::TrySendError::Full(t))
- } else if guard.cap == 0 {
- // With capacity 0, even though we have buffer space we can't
- // transfer the data unless there's a receiver waiting.
- match mem::replace(&mut guard.blocker, NoneBlocked) {
- NoneBlocked => Err(super::TrySendError::Full(t)),
- BlockedSender(..) => unreachable!(),
- BlockedReceiver(token) => {
- guard.buf.enqueue(t);
- wakeup(token, guard);
- Ok(())
- }
- }
- } else {
- // If the buffer has some space and the capacity isn't 0, then we
- // just enqueue the data for later retrieval, ensuring to wake up
- // any blocked receiver if there is one.
- assert!(guard.buf.size() < guard.buf.capacity());
- guard.buf.enqueue(t);
- match mem::replace(&mut guard.blocker, NoneBlocked) {
- BlockedReceiver(token) => wakeup(token, guard),
- NoneBlocked => {}
- BlockedSender(..) => unreachable!(),
- }
- Ok(())
- }
- }
-
- // Receives a message from this channel
- //
- // When reading this, remember that there can only ever be one receiver at
- // time.
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
- let mut guard = self.lock.lock().unwrap();
-
- let mut woke_up_after_waiting = false;
- // Wait for the buffer to have something in it. No need for a
- // while loop because we're the only receiver.
- if !guard.disconnected && guard.buf.size() == 0 {
- if let Some(deadline) = deadline {
- guard =
- wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting);
- } else {
- guard = wait(&self.lock, guard, BlockedReceiver);
- woke_up_after_waiting = true;
- }
- }
-
- // N.B., channel could be disconnected while waiting, so the order of
- // these conditionals is important.
- if guard.disconnected && guard.buf.size() == 0 {
- return Err(Disconnected);
- }
-
- // Pick up the data, wake up our neighbors, and carry on
- assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting));
-
- if guard.buf.size() == 0 {
- return Err(Empty);
- }
-
- let ret = guard.buf.dequeue();
- self.wakeup_senders(woke_up_after_waiting, guard);
- Ok(ret)
- }
-
- pub fn try_recv(&self) -> Result<T, Failure> {
- let mut guard = self.lock.lock().unwrap();
-
- // Easy cases first
- if guard.disconnected && guard.buf.size() == 0 {
- return Err(Disconnected);
- }
- if guard.buf.size() == 0 {
- return Err(Empty);
- }
-
- // Be sure to wake up neighbors
- let ret = Ok(guard.buf.dequeue());
- self.wakeup_senders(false, guard);
- ret
- }
-
- // Wake up pending senders after some data has been received
- //
- // * `waited` - flag if the receiver blocked to receive some data, or if it
- // just picked up some data on the way out
- // * `guard` - the lock guard that is held over this channel's lock
- fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) {
- let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
-
- // If this is a no-buffer channel (cap == 0), then if we didn't wait we
- // need to ACK the sender. If we waited, then the sender waking us up
- // was already the ACK.
- let pending_sender2 = if guard.cap == 0 && !waited {
- match mem::replace(&mut guard.blocker, NoneBlocked) {
- NoneBlocked => None,
- BlockedReceiver(..) => unreachable!(),
- BlockedSender(token) => {
- guard.canceled.take();
- Some(token)
- }
- }
- } else {
- None
- };
- mem::drop(guard);
-
- // only outside of the lock do we wake up the pending threads
- if let Some(token) = pending_sender1 {
- token.signal();
- }
- if let Some(token) = pending_sender2 {
- token.signal();
- }
- }
-
- // Prepares this shared packet for a channel clone, essentially just bumping
- // a refcount.
- pub fn clone_chan(&self) {
- let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
-
- // See comments on Arc::clone() on why we do this (for `mem::forget`).
- if old_count > MAX_REFCOUNT {
- abort();
- }
- }
-
- pub fn drop_chan(&self) {
- // Only flag the channel as disconnected if we're the last channel
- match self.channels.fetch_sub(1, Ordering::SeqCst) {
- 1 => {}
- _ => return,
- }
-
- // Not much to do other than wake up a receiver if one's there
- let mut guard = self.lock.lock().unwrap();
- if guard.disconnected {
- return;
- }
- guard.disconnected = true;
- match mem::replace(&mut guard.blocker, NoneBlocked) {
- NoneBlocked => {}
- BlockedSender(..) => unreachable!(),
- BlockedReceiver(token) => wakeup(token, guard),
- }
- }
-
- pub fn drop_port(&self) {
- let mut guard = self.lock.lock().unwrap();
-
- if guard.disconnected {
- return;
- }
- guard.disconnected = true;
-
- // If the capacity is 0, then the sender may want its data back after
- // we're disconnected. Otherwise it's now our responsibility to destroy
- // the buffered data. As with many other portions of this code, this
- // needs to be careful to destroy the data *outside* of the lock to
- // prevent deadlock.
- let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() };
- let mut queue =
- mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() });
-
- let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
- NoneBlocked => None,
- BlockedSender(token) => {
- *guard.canceled.take().unwrap() = true;
- Some(token)
- }
- BlockedReceiver(..) => unreachable!(),
- };
- mem::drop(guard);
-
- while let Some(token) = queue.dequeue() {
- token.signal();
- }
- if let Some(token) = waiter {
- token.signal();
- }
- }
-}
-
-impl<T> Drop for Packet<T> {
- fn drop(&mut self) {
- assert_eq!(self.channels.load(Ordering::SeqCst), 0);
- let mut guard = self.lock.lock().unwrap();
- assert!(guard.queue.dequeue().is_none());
- assert!(guard.canceled.is_none());
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Buffer, a simple ring buffer backed by Vec<T>
-////////////////////////////////////////////////////////////////////////////////
-
-impl<T> Buffer<T> {
- fn enqueue(&mut self, t: T) {
- let pos = (self.start + self.size) % self.buf.len();
- self.size += 1;
- let prev = mem::replace(&mut self.buf[pos], Some(t));
- assert!(prev.is_none());
- }
-
- fn dequeue(&mut self) -> T {
- let start = self.start;
- self.size -= 1;
- self.start = (self.start + 1) % self.buf.len();
- let result = &mut self.buf[start];
- result.take().unwrap()
- }
-
- fn size(&self) -> usize {
- self.size
- }
- fn capacity(&self) -> usize {
- self.buf.len()
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Queue, a simple queue to enqueue threads with (stack-allocated nodes)
-////////////////////////////////////////////////////////////////////////////////
-
-impl Queue {
- fn enqueue(&mut self, node: &mut Node) -> WaitToken {
- let (wait_token, signal_token) = blocking::tokens();
- node.token = Some(signal_token);
- node.next = ptr::null_mut();
-
- if self.tail.is_null() {
- self.head = node as *mut Node;
- self.tail = node as *mut Node;
- } else {
- unsafe {
- (*self.tail).next = node as *mut Node;
- self.tail = node as *mut Node;
- }
- }
-
- wait_token
- }
-
- fn dequeue(&mut self) -> Option<SignalToken> {
- if self.head.is_null() {
- return None;
- }
- let node = self.head;
- self.head = unsafe { (*node).next };
- if self.head.is_null() {
- self.tail = ptr::null_mut();
- }
- unsafe {
- (*node).next = ptr::null_mut();
- Some((*node).token.take().unwrap())
- }
- }
-}