diff options
| author | bors <bors@rust-lang.org> | 2020-07-27 17:39:01 +0000 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2020-07-27 17:39:01 +0000 |
| commit | 54e000891ffccd4cbfb92146b92736c83085df63 (patch) | |
| tree | 1200bb13eb9ae22def4c43bc657bc56da8faedc6 /library/std/src/sync | |
| parent | 4a90e36c85336d1d4b209556c1a9733210bbff19 (diff) | |
| parent | 6d9705220fec4553d693a7c19d99496e14c89edf (diff) | |
| download | rust-tmp-nightly.tar.gz | |
Auto merge of #73265 - mark-i-m:mv-std, r=<try>tmp-nightly
mv std libs to library/
This is the first step in refactoring the directory layout of this repository, with further followup steps planned (but not done yet).
Background: currently, all crates are under src/, without nested src directories and with the unconventional `lib*` prefixes (e.g., `src/libcore/lib.rs`). This directory structures is not idiomatic and makes the `src/` directory rather overwhelming. To improve contributor experience and make things a bit more approachable, we are reorganizing the repo a bit.
In this PR, we move the standard libs (basically anything that is "runtime", as opposed to part of the compiler, build system, or one of the tools, etc). The new layout moves these libraries to a new `library/` directory in the root of the repo. Additionally, we remove the `lib*` prefixes and add nested `src/` directories. The other crates/tools in this repo are not touched. So in summary:
```
library/<crate>/src/*.rs
src/<all the rest> // unchanged
```
where `<crate>` is:
- core
- alloc
- std
- test
- proc_macro
- panic_abort
- panic_unwind
- profiler_builtins
- term
- unwind
- rtstartup
- backtrace
- rustc-std-workspace-*
There was a lot of discussion about this and a few rounds of compiler team approvals, FCPs, MCPs, and nominations. The original MCP is https://github.com/rust-lang/compiler-team/issues/298. The final approval of the compiler team was given here: https://github.com/rust-lang/rust/pull/73265#issuecomment-659498446.
The name `library` was chosen to complement a later move of the compiler crates to a `compiler/` directory. There was a lot of discussion around adding the nested `src/` directories. Note that this does increase the nesting depth (plausibly important for manual traversal of the tree, e.g., through GitHub's UI or `cd`), but this is deemed to be better as it fits the standard layout of Rust crates throughout most of the ecosystem, though there is some debate about how much this should apply to multi-crate projects. Overall, there seem to be more people in favor of nested `src/` than against.
After this PR, there are no dependencies out of the `library/` directory except on the `build_helper` (or crates.io crates).
Diffstat (limited to 'library/std/src/sync')
| -rw-r--r-- | library/std/src/sync/barrier.rs | 215 | ||||
| -rw-r--r-- | library/std/src/sync/condvar.rs | 818 | ||||
| -rw-r--r-- | library/std/src/sync/mod.rs | 179 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/blocking.rs | 79 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/cache_aligned.rs | 27 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/mod.rs | 3033 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/mpsc_queue.rs | 165 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/oneshot.rs | 307 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/shared.rs | 489 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/spsc_queue.rs | 338 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/stream.rs | 453 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/sync.rs | 495 | ||||
| -rw-r--r-- | library/std/src/sync/mutex.rs | 767 | ||||
| -rw-r--r-- | library/std/src/sync/once.rs | 690 | ||||
| -rw-r--r-- | library/std/src/sync/rwlock.rs | 799 |
15 files changed, 8854 insertions, 0 deletions
diff --git a/library/std/src/sync/barrier.rs b/library/std/src/sync/barrier.rs new file mode 100644 index 00000000000..01314370ce3 --- /dev/null +++ b/library/std/src/sync/barrier.rs @@ -0,0 +1,215 @@ +use crate::fmt; +use crate::sync::{Condvar, Mutex}; + +/// A barrier enables multiple threads to synchronize the beginning +/// of some computation. +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Arc, Barrier}; +/// use std::thread; +/// +/// let mut handles = Vec::with_capacity(10); +/// let barrier = Arc::new(Barrier::new(10)); +/// for _ in 0..10 { +/// let c = barrier.clone(); +/// // The same messages will be printed together. +/// // You will NOT see any interleaving. +/// handles.push(thread::spawn(move|| { +/// println!("before wait"); +/// c.wait(); +/// println!("after wait"); +/// })); +/// } +/// // Wait for other threads to finish. +/// for handle in handles { +/// handle.join().unwrap(); +/// } +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct Barrier { + lock: Mutex<BarrierState>, + cvar: Condvar, + num_threads: usize, +} + +// The inner state of a double barrier +struct BarrierState { + count: usize, + generation_id: usize, +} + +/// A `BarrierWaitResult` is returned by [`wait`] when all threads in the [`Barrier`] +/// have rendezvoused. +/// +/// [`wait`]: struct.Barrier.html#method.wait +/// [`Barrier`]: struct.Barrier.html +/// +/// # Examples +/// +/// ``` +/// use std::sync::Barrier; +/// +/// let barrier = Barrier::new(1); +/// let barrier_wait_result = barrier.wait(); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct BarrierWaitResult(bool); + +#[stable(feature = "std_debug", since = "1.16.0")] +impl fmt::Debug for Barrier { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Barrier { .. }") + } +} + +impl Barrier { + /// Creates a new barrier that can block a given number of threads. + /// + /// A barrier will block `n`-1 threads which call [`wait`] and then wake up + /// all threads at once when the `n`th thread calls [`wait`]. + /// + /// [`wait`]: #method.wait + /// + /// # Examples + /// + /// ``` + /// use std::sync::Barrier; + /// + /// let barrier = Barrier::new(10); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn new(n: usize) -> Barrier { + Barrier { + lock: Mutex::new(BarrierState { count: 0, generation_id: 0 }), + cvar: Condvar::new(), + num_threads: n, + } + } + + /// Blocks the current thread until all threads have rendezvoused here. + /// + /// Barriers are re-usable after all threads have rendezvoused once, and can + /// be used continuously. + /// + /// A single (arbitrary) thread will receive a [`BarrierWaitResult`] that + /// returns `true` from [`is_leader`] when returning from this function, and + /// all other threads will receive a result that will return `false` from + /// [`is_leader`]. + /// + /// [`BarrierWaitResult`]: struct.BarrierWaitResult.html + /// [`is_leader`]: struct.BarrierWaitResult.html#method.is_leader + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Barrier}; + /// use std::thread; + /// + /// let mut handles = Vec::with_capacity(10); + /// let barrier = Arc::new(Barrier::new(10)); + /// for _ in 0..10 { + /// let c = barrier.clone(); + /// // The same messages will be printed together. + /// // You will NOT see any interleaving. + /// handles.push(thread::spawn(move|| { + /// println!("before wait"); + /// c.wait(); + /// println!("after wait"); + /// })); + /// } + /// // Wait for other threads to finish. + /// for handle in handles { + /// handle.join().unwrap(); + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn wait(&self) -> BarrierWaitResult { + let mut lock = self.lock.lock().unwrap(); + let local_gen = lock.generation_id; + lock.count += 1; + if lock.count < self.num_threads { + // We need a while loop to guard against spurious wakeups. + // http://en.wikipedia.org/wiki/Spurious_wakeup + while local_gen == lock.generation_id && lock.count < self.num_threads { + lock = self.cvar.wait(lock).unwrap(); + } + BarrierWaitResult(false) + } else { + lock.count = 0; + lock.generation_id = lock.generation_id.wrapping_add(1); + self.cvar.notify_all(); + BarrierWaitResult(true) + } + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl fmt::Debug for BarrierWaitResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BarrierWaitResult").field("is_leader", &self.is_leader()).finish() + } +} + +impl BarrierWaitResult { + /// Returns `true` if this thread from [`wait`] is the "leader thread". + /// + /// Only one thread will have `true` returned from their result, all other + /// threads will have `false` returned. + /// + /// [`wait`]: struct.Barrier.html#method.wait + /// + /// # Examples + /// + /// ``` + /// use std::sync::Barrier; + /// + /// let barrier = Barrier::new(1); + /// let barrier_wait_result = barrier.wait(); + /// println!("{:?}", barrier_wait_result.is_leader()); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn is_leader(&self) -> bool { + self.0 + } +} + +#[cfg(test)] +mod tests { + use crate::sync::mpsc::{channel, TryRecvError}; + use crate::sync::{Arc, Barrier}; + use crate::thread; + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn test_barrier() { + const N: usize = 10; + + let barrier = Arc::new(Barrier::new(N)); + let (tx, rx) = channel(); + + for _ in 0..N - 1 { + let c = barrier.clone(); + let tx = tx.clone(); + thread::spawn(move || { + tx.send(c.wait().is_leader()).unwrap(); + }); + } + + // At this point, all spawned threads should be blocked, + // so we shouldn't get anything from the port + assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty))); + + let mut leader_found = barrier.wait().is_leader(); + + // Now, the barrier is cleared and we should get data. + for _ in 0..N - 1 { + if rx.recv().unwrap() { + assert!(!leader_found); + leader_found = true; + } + } + assert!(leader_found); + } +} diff --git a/library/std/src/sync/condvar.rs b/library/std/src/sync/condvar.rs new file mode 100644 index 00000000000..9b90bfd68b5 --- /dev/null +++ b/library/std/src/sync/condvar.rs @@ -0,0 +1,818 @@ +use crate::fmt; +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::{mutex, MutexGuard, PoisonError}; +use crate::sys_common::condvar as sys; +use crate::sys_common::mutex as sys_mutex; +use crate::sys_common::poison::{self, LockResult}; +use crate::time::{Duration, Instant}; + +/// A type indicating whether a timed wait on a condition variable returned +/// due to a time out or not. +/// +/// It is returned by the [`wait_timeout`] method. +/// +/// [`wait_timeout`]: struct.Condvar.html#method.wait_timeout +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +#[stable(feature = "wait_timeout", since = "1.5.0")] +pub struct WaitTimeoutResult(bool); + +impl WaitTimeoutResult { + /// Returns `true` if the wait was known to have timed out. + /// + /// # Examples + /// + /// This example spawns a thread which will update the boolean value and + /// then wait 100 milliseconds before notifying the condvar. + /// + /// The main thread will wait with a timeout on the condvar and then leave + /// once the boolean has been updated and notified. + /// + /// ``` + /// use std::sync::{Arc, Condvar, Mutex}; + /// use std::thread; + /// use std::time::Duration; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// thread::spawn(move || { + /// let (lock, cvar) = &*pair2; + /// + /// // Let's wait 20 milliseconds before notifying the condvar. + /// thread::sleep(Duration::from_millis(20)); + /// + /// let mut started = lock.lock().unwrap(); + /// // We update the boolean value. + /// *started = true; + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().unwrap(); + /// loop { + /// // Let's put a timeout on the condvar's wait. + /// let result = cvar.wait_timeout(started, Duration::from_millis(10)).unwrap(); + /// // 10 milliseconds have passed, or maybe the value changed! + /// started = result.0; + /// if *started == true { + /// // We received the notification and the value has been updated, we can leave. + /// break + /// } + /// } + /// ``` + #[stable(feature = "wait_timeout", since = "1.5.0")] + pub fn timed_out(&self) -> bool { + self.0 + } +} + +/// A Condition Variable +/// +/// Condition variables represent the ability to block a thread such that it +/// consumes no CPU time while waiting for an event to occur. Condition +/// variables are typically associated with a boolean predicate (a condition) +/// and a mutex. The predicate is always verified inside of the mutex before +/// determining that a thread must block. +/// +/// Functions in this module will block the current **thread** of execution and +/// are bindings to system-provided condition variables where possible. Note +/// that this module places one additional restriction over the system condition +/// variables: each condvar can be used with precisely one mutex at runtime. Any +/// attempt to use multiple mutexes on the same condition variable will result +/// in a runtime panic. If this is not desired, then the unsafe primitives in +/// `sys` do not have this restriction but may result in undefined behavior. +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Arc, Mutex, Condvar}; +/// use std::thread; +/// +/// let pair = Arc::new((Mutex::new(false), Condvar::new())); +/// let pair2 = pair.clone(); +/// +/// // Inside of our lock, spawn a new thread, and then wait for it to start. +/// thread::spawn(move|| { +/// let (lock, cvar) = &*pair2; +/// let mut started = lock.lock().unwrap(); +/// *started = true; +/// // We notify the condvar that the value has changed. +/// cvar.notify_one(); +/// }); +/// +/// // Wait for the thread to start up. +/// let (lock, cvar) = &*pair; +/// let mut started = lock.lock().unwrap(); +/// while !*started { +/// started = cvar.wait(started).unwrap(); +/// } +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct Condvar { + inner: Box<sys::Condvar>, + mutex: AtomicUsize, +} + +impl Condvar { + /// Creates a new condition variable which is ready to be waited on and + /// notified. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Condvar; + /// + /// let condvar = Condvar::new(); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn new() -> Condvar { + let mut c = Condvar { inner: box sys::Condvar::new(), mutex: AtomicUsize::new(0) }; + unsafe { + c.inner.init(); + } + c + } + + /// Blocks the current thread until this condition variable receives a + /// notification. + /// + /// This function will atomically unlock the mutex specified (represented by + /// `guard`) and block the current thread. This means that any calls + /// to [`notify_one`] or [`notify_all`] which happen logically after the + /// mutex is unlocked are candidates to wake this thread up. When this + /// function call returns, the lock specified will have been re-acquired. + /// + /// Note that this function is susceptible to spurious wakeups. Condition + /// variables normally have a boolean predicate associated with them, and + /// the predicate must always be checked each time this function returns to + /// protect against spurious wakeups. + /// + /// # Errors + /// + /// This function will return an error if the mutex being waited on is + /// poisoned when this thread re-acquires the lock. For more information, + /// see information about [poisoning] on the [`Mutex`] type. + /// + /// # Panics + /// + /// This function will [`panic!`] if it is used with more than one mutex + /// over time. Each condition variable is dynamically bound to exactly one + /// mutex to ensure defined behavior across platforms. If this functionality + /// is not desired, then unsafe primitives in `sys` are provided. + /// + /// [`notify_one`]: #method.notify_one + /// [`notify_all`]: #method.notify_all + /// [poisoning]: ../sync/struct.Mutex.html#poisoning + /// [`Mutex`]: ../sync/struct.Mutex.html + /// [`panic!`]: ../../std/macro.panic.html + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().unwrap(); + /// // As long as the value inside the `Mutex<bool>` is `false`, we wait. + /// while !*started { + /// started = cvar.wait(started).unwrap(); + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> LockResult<MutexGuard<'a, T>> { + let poisoned = unsafe { + let lock = mutex::guard_lock(&guard); + self.verify(lock); + self.inner.wait(lock); + mutex::guard_poison(&guard).get() + }; + if poisoned { Err(PoisonError::new(guard)) } else { Ok(guard) } + } + + /// Blocks the current thread until this condition variable receives a + /// notification and the provided condition is false. + /// + /// This function will atomically unlock the mutex specified (represented by + /// `guard`) and block the current thread. This means that any calls + /// to [`notify_one`] or [`notify_all`] which happen logically after the + /// mutex is unlocked are candidates to wake this thread up. When this + /// function call returns, the lock specified will have been re-acquired. + /// + /// # Errors + /// + /// This function will return an error if the mutex being waited on is + /// poisoned when this thread re-acquires the lock. For more information, + /// see information about [poisoning] on the [`Mutex`] type. + /// + /// [`notify_one`]: #method.notify_one + /// [`notify_all`]: #method.notify_all + /// [poisoning]: ../sync/struct.Mutex.html#poisoning + /// [`Mutex`]: ../sync/struct.Mutex.html + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// + /// let pair = Arc::new((Mutex::new(true), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut pending = lock.lock().unwrap(); + /// *pending = false; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// // As long as the value inside the `Mutex<bool>` is `true`, we wait. + /// let _guard = cvar.wait_while(lock.lock().unwrap(), |pending| { *pending }).unwrap(); + /// ``` + #[stable(feature = "wait_until", since = "1.42.0")] + pub fn wait_while<'a, T, F>( + &self, + mut guard: MutexGuard<'a, T>, + mut condition: F, + ) -> LockResult<MutexGuard<'a, T>> + where + F: FnMut(&mut T) -> bool, + { + while condition(&mut *guard) { + guard = self.wait(guard)?; + } + Ok(guard) + } + + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to [`wait`] + /// except that the thread will be blocked for roughly no longer + /// than `ms` milliseconds. This method should not be used for + /// precise timing due to anomalies such as preemption or platform + /// differences that may not cause the maximum amount of time + /// waited to be precisely `ms`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned boolean is `false` only if the timeout is known + /// to have elapsed. + /// + /// Like [`wait`], the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + /// + /// [`wait`]: #method.wait + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().unwrap(); + /// // As long as the value inside the `Mutex<bool>` is `false`, we wait. + /// loop { + /// let result = cvar.wait_timeout_ms(started, 10).unwrap(); + /// // 10 milliseconds have passed, or maybe the value changed! + /// started = result.0; + /// if *started == true { + /// // We received the notification and the value has been updated, we can leave. + /// break + /// } + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + #[rustc_deprecated(since = "1.6.0", reason = "replaced by `std::sync::Condvar::wait_timeout`")] + pub fn wait_timeout_ms<'a, T>( + &self, + guard: MutexGuard<'a, T>, + ms: u32, + ) -> LockResult<(MutexGuard<'a, T>, bool)> { + let res = self.wait_timeout(guard, Duration::from_millis(ms as u64)); + poison::map_result(res, |(a, b)| (a, !b.timed_out())) + } + + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to [`wait`] except that + /// the thread will be blocked for roughly no longer than `dur`. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely `dur`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. This function is susceptible to spurious wakeups. + /// Condition variables normally have a boolean predicate associated with + /// them, and the predicate must always be checked each time this function + /// returns to protect against spurious wakeups. Additionally, it is + /// typically desirable for the timeout to not exceed some duration in + /// spite of spurious wakes, thus the sleep-duration is decremented by the + /// amount slept. Alternatively, use the `wait_timeout_while` method + /// to wait with a timeout while a predicate is true. + /// + /// The returned [`WaitTimeoutResult`] value indicates if the timeout is + /// known to have elapsed. + /// + /// Like [`wait`], the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + /// + /// [`wait`]: #method.wait + /// [`wait_timeout_while`]: #method.wait_timeout_while + /// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// use std::time::Duration; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // wait for the thread to start up + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().unwrap(); + /// // as long as the value inside the `Mutex<bool>` is `false`, we wait + /// loop { + /// let result = cvar.wait_timeout(started, Duration::from_millis(10)).unwrap(); + /// // 10 milliseconds have passed, or maybe the value changed! + /// started = result.0; + /// if *started == true { + /// // We received the notification and the value has been updated, we can leave. + /// break + /// } + /// } + /// ``` + #[stable(feature = "wait_timeout", since = "1.5.0")] + pub fn wait_timeout<'a, T>( + &self, + guard: MutexGuard<'a, T>, + dur: Duration, + ) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> { + let (poisoned, result) = unsafe { + let lock = mutex::guard_lock(&guard); + self.verify(lock); + let success = self.inner.wait_timeout(lock, dur); + (mutex::guard_poison(&guard).get(), WaitTimeoutResult(!success)) + }; + if poisoned { Err(PoisonError::new((guard, result))) } else { Ok((guard, result)) } + } + + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to [`wait_while`] except + /// that the thread will be blocked for roughly no longer than `dur`. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely `dur`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned [`WaitTimeoutResult`] value indicates if the timeout is + /// known to have elapsed without the condition being met. + /// + /// Like [`wait_while`], the lock specified will be re-acquired when this + /// function returns, regardless of whether the timeout elapsed or not. + /// + /// [`wait_while`]: #method.wait_while + /// [`wait_timeout`]: #method.wait_timeout + /// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// use std::time::Duration; + /// + /// let pair = Arc::new((Mutex::new(true), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut pending = lock.lock().unwrap(); + /// *pending = false; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // wait for the thread to start up + /// let (lock, cvar) = &*pair; + /// let result = cvar.wait_timeout_while( + /// lock.lock().unwrap(), + /// Duration::from_millis(100), + /// |&mut pending| pending, + /// ).unwrap(); + /// if result.1.timed_out() { + /// // timed-out without the condition ever evaluating to false. + /// } + /// // access the locked mutex via result.0 + /// ``` + #[stable(feature = "wait_timeout_until", since = "1.42.0")] + pub fn wait_timeout_while<'a, T, F>( + &self, + mut guard: MutexGuard<'a, T>, + dur: Duration, + mut condition: F, + ) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> + where + F: FnMut(&mut T) -> bool, + { + let start = Instant::now(); + loop { + if !condition(&mut *guard) { + return Ok((guard, WaitTimeoutResult(false))); + } + let timeout = match dur.checked_sub(start.elapsed()) { + Some(timeout) => timeout, + None => return Ok((guard, WaitTimeoutResult(true))), + }; + guard = self.wait_timeout(guard, timeout)?.0; + } + } + + /// Wakes up one blocked thread on this condvar. + /// + /// If there is a blocked thread on this condition variable, then it will + /// be woken up from its call to [`wait`] or [`wait_timeout`]. Calls to + /// `notify_one` are not buffered in any way. + /// + /// To wake up all threads, see [`notify_all`]. + /// + /// [`wait`]: #method.wait + /// [`wait_timeout`]: #method.wait_timeout + /// [`notify_all`]: #method.notify_all + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().unwrap(); + /// // As long as the value inside the `Mutex<bool>` is `false`, we wait. + /// while !*started { + /// started = cvar.wait(started).unwrap(); + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn notify_one(&self) { + unsafe { self.inner.notify_one() } + } + + /// Wakes up all blocked threads on this condvar. + /// + /// This method will ensure that any current waiters on the condition + /// variable are awoken. Calls to `notify_all()` are not buffered in any + /// way. + /// + /// To wake up only one thread, see [`notify_one`]. + /// + /// [`notify_one`]: #method.notify_one + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = pair.clone(); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_all(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().unwrap(); + /// // As long as the value inside the `Mutex<bool>` is `false`, we wait. + /// while !*started { + /// started = cvar.wait(started).unwrap(); + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn notify_all(&self) { + unsafe { self.inner.notify_all() } + } + + fn verify(&self, mutex: &sys_mutex::Mutex) { + let addr = mutex as *const _ as usize; + match self.mutex.compare_and_swap(0, addr, Ordering::SeqCst) { + // If we got out 0, then we have successfully bound the mutex to + // this cvar. + 0 => {} + + // If we get out a value that's the same as `addr`, then someone + // already beat us to the punch. + n if n == addr => {} + + // Anything else and we're using more than one mutex on this cvar, + // which is currently disallowed. + _ => panic!( + "attempted to use a condition variable with two \ + mutexes" + ), + } + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl fmt::Debug for Condvar { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Condvar { .. }") + } +} + +#[stable(feature = "condvar_default", since = "1.10.0")] +impl Default for Condvar { + /// Creates a `Condvar` which is ready to be waited on and notified. + fn default() -> Condvar { + Condvar::new() + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Drop for Condvar { + fn drop(&mut self) { + unsafe { self.inner.destroy() } + } +} + +#[cfg(test)] +mod tests { + use crate::sync::atomic::{AtomicBool, Ordering}; + use crate::sync::mpsc::channel; + use crate::sync::{Arc, Condvar, Mutex}; + use crate::thread; + use crate::time::Duration; + + #[test] + fn smoke() { + let c = Condvar::new(); + c.notify_one(); + c.notify_all(); + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn notify_one() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let g = m.lock().unwrap(); + let _t = thread::spawn(move || { + let _g = m2.lock().unwrap(); + c2.notify_one(); + }); + let g = c.wait(g).unwrap(); + drop(g); + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn notify_all() { + const N: usize = 10; + + let data = Arc::new((Mutex::new(0), Condvar::new())); + let (tx, rx) = channel(); + for _ in 0..N { + let data = data.clone(); + let tx = tx.clone(); + thread::spawn(move || { + let &(ref lock, ref cond) = &*data; + let mut cnt = lock.lock().unwrap(); + *cnt += 1; + if *cnt == N { + tx.send(()).unwrap(); + } + while *cnt != 0 { + cnt = cond.wait(cnt).unwrap(); + } + tx.send(()).unwrap(); + }); + } + drop(tx); + + let &(ref lock, ref cond) = &*data; + rx.recv().unwrap(); + let mut cnt = lock.lock().unwrap(); + *cnt = 0; + cond.notify_all(); + drop(cnt); + + for _ in 0..N { + rx.recv().unwrap(); + } + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_while() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair2 = pair.clone(); + + // Inside of our lock, spawn a new thread, and then wait for it to start. + thread::spawn(move || { + let &(ref lock, ref cvar) = &*pair2; + let mut started = lock.lock().unwrap(); + *started = true; + // We notify the condvar that the value has changed. + cvar.notify_one(); + }); + + // Wait for the thread to start up. + let &(ref lock, ref cvar) = &*pair; + let guard = cvar.wait_while(lock.lock().unwrap(), |started| !*started); + assert!(*guard.unwrap()); + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_timeout_wait() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + loop { + let g = m.lock().unwrap(); + let (_g, no_timeout) = c.wait_timeout(g, Duration::from_millis(1)).unwrap(); + // spurious wakeups mean this isn't necessarily true + // so execute test again, if not timeout + if !no_timeout.timed_out() { + continue; + } + + break; + } + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_timeout_while_wait() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let g = m.lock().unwrap(); + let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(1), |_| true).unwrap(); + // no spurious wakeups. ensure it timed-out + assert!(wait.timed_out()); + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_timeout_while_instant_satisfy() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let g = m.lock().unwrap(); + let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(0), |_| false).unwrap(); + // ensure it didn't time-out even if we were not given any time. + assert!(!wait.timed_out()); + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_timeout_while_wake() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair_copy = pair.clone(); + + let &(ref m, ref c) = &*pair; + let g = m.lock().unwrap(); + let _t = thread::spawn(move || { + let &(ref lock, ref cvar) = &*pair_copy; + let mut started = lock.lock().unwrap(); + thread::sleep(Duration::from_millis(1)); + *started = true; + cvar.notify_one(); + }); + let (g2, wait) = c + .wait_timeout_while(g, Duration::from_millis(u64::MAX), |&mut notified| !notified) + .unwrap(); + // ensure it didn't time-out even if we were not given any time. + assert!(!wait.timed_out()); + assert!(*g2); + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_timeout_wake() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + loop { + let g = m.lock().unwrap(); + + let c2 = c.clone(); + let m2 = m.clone(); + + let notified = Arc::new(AtomicBool::new(false)); + let notified_copy = notified.clone(); + + let t = thread::spawn(move || { + let _g = m2.lock().unwrap(); + thread::sleep(Duration::from_millis(1)); + notified_copy.store(true, Ordering::SeqCst); + c2.notify_one(); + }); + let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u64::MAX)).unwrap(); + assert!(!timeout_res.timed_out()); + // spurious wakeups mean this isn't necessarily true + // so execute test again, if not notified + if !notified.load(Ordering::SeqCst) { + t.join().unwrap(); + continue; + } + drop(g); + + t.join().unwrap(); + + break; + } + } + + #[test] + #[should_panic] + #[cfg_attr(target_os = "emscripten", ignore)] + fn two_mutexes() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let mut g = m.lock().unwrap(); + let _t = thread::spawn(move || { + let _g = m2.lock().unwrap(); + c2.notify_one(); + }); + g = c.wait(g).unwrap(); + drop(g); + + let m = Mutex::new(()); + let _ = c.wait(m.lock().unwrap()).unwrap(); + } +} diff --git a/library/std/src/sync/mod.rs b/library/std/src/sync/mod.rs new file mode 100644 index 00000000000..b6699910b07 --- /dev/null +++ b/library/std/src/sync/mod.rs @@ -0,0 +1,179 @@ +//! Useful synchronization primitives. +//! +//! ## The need for synchronization +//! +//! Conceptually, a Rust program is a series of operations which will +//! be executed on a computer. The timeline of events happening in the +//! program is consistent with the order of the operations in the code. +//! +//! Consider the following code, operating on some global static variables: +//! +//! ```rust +//! static mut A: u32 = 0; +//! static mut B: u32 = 0; +//! static mut C: u32 = 0; +//! +//! fn main() { +//! unsafe { +//! A = 3; +//! B = 4; +//! A = A + B; +//! C = B; +//! println!("{} {} {}", A, B, C); +//! C = A; +//! } +//! } +//! ``` +//! +//! It appears as if some variables stored in memory are changed, an addition +//! is performed, result is stored in `A` and the variable `C` is +//! modified twice. +//! +//! When only a single thread is involved, the results are as expected: +//! the line `7 4 4` gets printed. +//! +//! As for what happens behind the scenes, when optimizations are enabled the +//! final generated machine code might look very different from the code: +//! +//! - The first store to `C` might be moved before the store to `A` or `B`, +//! _as if_ we had written `C = 4; A = 3; B = 4`. +//! +//! - Assignment of `A + B` to `A` might be removed, since the sum can be stored +//! in a temporary location until it gets printed, with the global variable +//! never getting updated. +//! +//! - The final result could be determined just by looking at the code +//! at compile time, so [constant folding] might turn the whole +//! block into a simple `println!("7 4 4")`. +//! +//! The compiler is allowed to perform any combination of these +//! optimizations, as long as the final optimized code, when executed, +//! produces the same results as the one without optimizations. +//! +//! Due to the [concurrency] involved in modern computers, assumptions +//! about the program's execution order are often wrong. Access to +//! global variables can lead to nondeterministic results, **even if** +//! compiler optimizations are disabled, and it is **still possible** +//! to introduce synchronization bugs. +//! +//! Note that thanks to Rust's safety guarantees, accessing global (static) +//! variables requires `unsafe` code, assuming we don't use any of the +//! synchronization primitives in this module. +//! +//! [constant folding]: https://en.wikipedia.org/wiki/Constant_folding +//! [concurrency]: https://en.wikipedia.org/wiki/Concurrency_(computer_science) +//! +//! ## Out-of-order execution +//! +//! Instructions can execute in a different order from the one we define, due to +//! various reasons: +//! +//! - The **compiler** reordering instructions: If the compiler can issue an +//! instruction at an earlier point, it will try to do so. For example, it +//! might hoist memory loads at the top of a code block, so that the CPU can +//! start [prefetching] the values from memory. +//! +//! In single-threaded scenarios, this can cause issues when writing +//! signal handlers or certain kinds of low-level code. +//! Use [compiler fences] to prevent this reordering. +//! +//! - A **single processor** executing instructions [out-of-order]: +//! Modern CPUs are capable of [superscalar] execution, +//! i.e., multiple instructions might be executing at the same time, +//! even though the machine code describes a sequential process. +//! +//! This kind of reordering is handled transparently by the CPU. +//! +//! - A **multiprocessor** system executing multiple hardware threads +//! at the same time: In multi-threaded scenarios, you can use two +//! kinds of primitives to deal with synchronization: +//! - [memory fences] to ensure memory accesses are made visible to +//! other CPUs in the right order. +//! - [atomic operations] to ensure simultaneous access to the same +//! memory location doesn't lead to undefined behavior. +//! +//! [prefetching]: https://en.wikipedia.org/wiki/Cache_prefetching +//! [compiler fences]: crate::sync::atomic::compiler_fence +//! [out-of-order]: https://en.wikipedia.org/wiki/Out-of-order_execution +//! [superscalar]: https://en.wikipedia.org/wiki/Superscalar_processor +//! [memory fences]: crate::sync::atomic::fence +//! [atomic operations]: crate::sync::atomic +//! +//! ## Higher-level synchronization objects +//! +//! Most of the low-level synchronization primitives are quite error-prone and +//! inconvenient to use, which is why the standard library also exposes some +//! higher-level synchronization objects. +//! +//! These abstractions can be built out of lower-level primitives. +//! For efficiency, the sync objects in the standard library are usually +//! implemented with help from the operating system's kernel, which is +//! able to reschedule the threads while they are blocked on acquiring +//! a lock. +//! +//! The following is an overview of the available synchronization +//! objects: +//! +//! - [`Arc`]: Atomically Reference-Counted pointer, which can be used +//! in multithreaded environments to prolong the lifetime of some +//! data until all the threads have finished using it. +//! +//! - [`Barrier`]: Ensures multiple threads will wait for each other +//! to reach a point in the program, before continuing execution all +//! together. +//! +//! - [`Condvar`]: Condition Variable, providing the ability to block +//! a thread while waiting for an event to occur. +//! +//! - [`mpsc`]: Multi-producer, single-consumer queues, used for +//! message-based communication. Can provide a lightweight +//! inter-thread synchronisation mechanism, at the cost of some +//! extra memory. +//! +//! - [`Mutex`]: Mutual Exclusion mechanism, which ensures that at +//! most one thread at a time is able to access some data. +//! +//! - [`Once`]: Used for thread-safe, one-time initialization of a +//! global variable. +//! +//! - [`RwLock`]: Provides a mutual exclusion mechanism which allows +//! multiple readers at the same time, while allowing only one +//! writer at a time. In some cases, this can be more efficient than +//! a mutex. +//! +//! [`Arc`]: crate::sync::Arc +//! [`Barrier`]: crate::sync::Barrier +//! [`Condvar`]: crate::sync::Condvar +//! [`mpsc`]: crate::sync::mpsc +//! [`Mutex`]: crate::sync::Mutex +//! [`Once`]: crate::sync::Once +//! [`RwLock`]: crate::sync::RwLock + +#![stable(feature = "rust1", since = "1.0.0")] + +#[stable(feature = "rust1", since = "1.0.0")] +pub use alloc_crate::sync::{Arc, Weak}; +#[stable(feature = "rust1", since = "1.0.0")] +pub use core::sync::atomic; + +#[stable(feature = "rust1", since = "1.0.0")] +pub use self::barrier::{Barrier, BarrierWaitResult}; +#[stable(feature = "rust1", since = "1.0.0")] +pub use self::condvar::{Condvar, WaitTimeoutResult}; +#[stable(feature = "rust1", since = "1.0.0")] +pub use self::mutex::{Mutex, MutexGuard}; +#[stable(feature = "rust1", since = "1.0.0")] +#[allow(deprecated)] +pub use self::once::{Once, OnceState, ONCE_INIT}; +#[stable(feature = "rust1", since = "1.0.0")] +pub use self::rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +#[stable(feature = "rust1", since = "1.0.0")] +pub use crate::sys_common::poison::{LockResult, PoisonError, TryLockError, TryLockResult}; + +pub mod mpsc; + +mod barrier; +mod condvar; +mod mutex; +mod once; +mod rwlock; diff --git a/library/std/src/sync/mpsc/blocking.rs b/library/std/src/sync/mpsc/blocking.rs new file mode 100644 index 00000000000..d34de6a4fac --- /dev/null +++ b/library/std/src/sync/mpsc/blocking.rs @@ -0,0 +1,79 @@ +//! Generic support for building blocking abstractions. + +use crate::mem; +use crate::sync::atomic::{AtomicBool, Ordering}; +use crate::sync::Arc; +use crate::thread::{self, Thread}; +use crate::time::Instant; + +struct Inner { + thread: Thread, + woken: AtomicBool, +} + +unsafe impl Send for Inner {} +unsafe impl Sync for Inner {} + +#[derive(Clone)] +pub struct SignalToken { + inner: Arc<Inner>, +} + +pub struct WaitToken { + inner: Arc<Inner>, +} + +impl !Send for WaitToken {} + +impl !Sync for WaitToken {} + +pub fn tokens() -> (WaitToken, SignalToken) { + let inner = Arc::new(Inner { thread: thread::current(), woken: AtomicBool::new(false) }); + let wait_token = WaitToken { inner: inner.clone() }; + let signal_token = SignalToken { inner }; + (wait_token, signal_token) +} + +impl SignalToken { + pub fn signal(&self) -> bool { + let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst); + if wake { + self.inner.thread.unpark(); + } + wake + } + + /// Converts to an unsafe usize value. Useful for storing in a pipe's state + /// flag. + #[inline] + pub unsafe fn cast_to_usize(self) -> usize { + mem::transmute(self.inner) + } + + /// Converts from an unsafe usize value. Useful for retrieving a pipe's state + /// flag. + #[inline] + pub unsafe fn cast_from_usize(signal_ptr: usize) -> SignalToken { + SignalToken { inner: mem::transmute(signal_ptr) } + } +} + +impl WaitToken { + pub fn wait(self) { + while !self.inner.woken.load(Ordering::SeqCst) { + thread::park() + } + } + + /// Returns `true` if we wake up normally. + pub fn wait_max_until(self, end: Instant) -> bool { + while !self.inner.woken.load(Ordering::SeqCst) { + let now = Instant::now(); + if now >= end { + return false; + } + thread::park_timeout(end - now) + } + true + } +} diff --git a/library/std/src/sync/mpsc/cache_aligned.rs b/library/std/src/sync/mpsc/cache_aligned.rs new file mode 100644 index 00000000000..b0842144328 --- /dev/null +++ b/library/std/src/sync/mpsc/cache_aligned.rs @@ -0,0 +1,27 @@ +use crate::ops::{Deref, DerefMut}; + +#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[repr(align(64))] +pub(super) struct Aligner; + +#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(super) struct CacheAligned<T>(pub T, pub Aligner); + +impl<T> Deref for CacheAligned<T> { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<T> DerefMut for CacheAligned<T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl<T> CacheAligned<T> { + pub(super) fn new(t: T) -> Self { + CacheAligned(t, Aligner) + } +} diff --git a/library/std/src/sync/mpsc/mod.rs b/library/std/src/sync/mpsc/mod.rs new file mode 100644 index 00000000000..3ff50e9f213 --- /dev/null +++ b/library/std/src/sync/mpsc/mod.rs @@ -0,0 +1,3033 @@ +// ignore-tidy-filelength + +//! Multi-producer, single-consumer FIFO queue communication primitives. +//! +//! This module provides message-based communication over channels, concretely +//! defined among three types: +//! +//! * [`Sender`] +//! * [`SyncSender`] +//! * [`Receiver`] +//! +//! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both +//! senders are clone-able (multi-producer) such that many threads can send +//! simultaneously to one receiver (single-consumer). +//! +//! These channels come in two flavors: +//! +//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function +//! will return a `(Sender, Receiver)` tuple where all sends will be +//! **asynchronous** (they never block). The channel conceptually has an +//! infinite buffer. +//! +//! 2. A synchronous, bounded channel. The [`sync_channel`] function will +//! return a `(SyncSender, Receiver)` tuple where the storage for pending +//! messages is a pre-allocated buffer of a fixed size. All sends will be +//! **synchronous** by blocking until there is buffer space available. Note +//! that a bound of 0 is allowed, causing the channel to become a "rendezvous" +//! channel where each sender atomically hands off a message to a receiver. +//! +//! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html +//! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html +//! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html +//! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send +//! [`channel`]: ../../../std/sync/mpsc/fn.channel.html +//! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html +//! +//! ## Disconnection +//! +//! The send and receive operations on channels will all return a [`Result`] +//! indicating whether the operation succeeded or not. An unsuccessful operation +//! is normally indicative of the other half of a channel having "hung up" by +//! being dropped in its corresponding thread. +//! +//! Once half of a channel has been deallocated, most operations can no longer +//! continue to make progress, so [`Err`] will be returned. Many applications +//! will continue to [`unwrap`] the results returned from this module, +//! instigating a propagation of failure among threads if one unexpectedly dies. +//! +//! [`Result`]: ../../../std/result/enum.Result.html +//! [`Err`]: ../../../std/result/enum.Result.html#variant.Err +//! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap +//! +//! # Examples +//! +//! Simple usage: +//! +//! ``` +//! use std::thread; +//! use std::sync::mpsc::channel; +//! +//! // Create a simple streaming channel +//! let (tx, rx) = channel(); +//! thread::spawn(move|| { +//! tx.send(10).unwrap(); +//! }); +//! assert_eq!(rx.recv().unwrap(), 10); +//! ``` +//! +//! Shared usage: +//! +//! ``` +//! use std::thread; +//! use std::sync::mpsc::channel; +//! +//! // Create a shared channel that can be sent along from many threads +//! // where tx is the sending half (tx for transmission), and rx is the receiving +//! // half (rx for receiving). +//! let (tx, rx) = channel(); +//! for i in 0..10 { +//! let tx = tx.clone(); +//! thread::spawn(move|| { +//! tx.send(i).unwrap(); +//! }); +//! } +//! +//! for _ in 0..10 { +//! let j = rx.recv().unwrap(); +//! assert!(0 <= j && j < 10); +//! } +//! ``` +//! +//! Propagating panics: +//! +//! ``` +//! use std::sync::mpsc::channel; +//! +//! // The call to recv() will return an error because the channel has already +//! // hung up (or been deallocated) +//! let (tx, rx) = channel::<i32>(); +//! drop(tx); +//! assert!(rx.recv().is_err()); +//! ``` +//! +//! Synchronous channels: +//! +//! ``` +//! use std::thread; +//! use std::sync::mpsc::sync_channel; +//! +//! let (tx, rx) = sync_channel::<i32>(0); +//! thread::spawn(move|| { +//! // This will wait for the parent thread to start receiving +//! tx.send(53).unwrap(); +//! }); +//! rx.recv().unwrap(); +//! ``` + +#![stable(feature = "rust1", since = "1.0.0")] + +// A description of how Rust's channel implementation works +// +// Channels are supposed to be the basic building block for all other +// concurrent primitives that are used in Rust. As a result, the channel type +// needs to be highly optimized, flexible, and broad enough for use everywhere. +// +// The choice of implementation of all channels is to be built on lock-free data +// structures. The channels themselves are then consequently also lock-free data +// structures. As always with lock-free code, this is a very "here be dragons" +// territory, especially because I'm unaware of any academic papers that have +// gone into great length about channels of these flavors. +// +// ## Flavors of channels +// +// From the perspective of a consumer of this library, there is only one flavor +// of channel. This channel can be used as a stream and cloned to allow multiple +// senders. Under the hood, however, there are actually three flavors of +// channels in play. +// +// * Flavor::Oneshots - these channels are highly optimized for the one-send use +// case. They contain as few atomics as possible and +// involve one and exactly one allocation. +// * Streams - these channels are optimized for the non-shared use case. They +// use a different concurrent queue that is more tailored for this +// use case. The initial allocation of this flavor of channel is not +// optimized. +// * Shared - this is the most general form of channel that this module offers, +// a channel with multiple senders. This type is as optimized as it +// can be, but the previous two types mentioned are much faster for +// their use-cases. +// +// ## Concurrent queues +// +// The basic idea of Rust's Sender/Receiver types is that send() never blocks, +// but recv() obviously blocks. This means that under the hood there must be +// some shared and concurrent queue holding all of the actual data. +// +// With two flavors of channels, two flavors of queues are also used. We have +// chosen to use queues from a well-known author that are abbreviated as SPSC +// and MPSC (single producer, single consumer and multiple producer, single +// consumer). SPSC queues are used for streams while MPSC queues are used for +// shared channels. +// +// ### SPSC optimizations +// +// The SPSC queue found online is essentially a linked list of nodes where one +// half of the nodes are the "queue of data" and the other half of nodes are a +// cache of unused nodes. The unused nodes are used such that an allocation is +// not required on every push() and a free doesn't need to happen on every +// pop(). +// +// As found online, however, the cache of nodes is of an infinite size. This +// means that if a channel at one point in its life had 50k items in the queue, +// then the queue will always have the capacity for 50k items. I believed that +// this was an unnecessary limitation of the implementation, so I have altered +// the queue to optionally have a bound on the cache size. +// +// By default, streams will have an unbounded SPSC queue with a small-ish cache +// size. The hope is that the cache is still large enough to have very fast +// send() operations while not too large such that millions of channels can +// coexist at once. +// +// ### MPSC optimizations +// +// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses +// a linked list under the hood to earn its unboundedness, but I have not put +// forth much effort into having a cache of nodes similar to the SPSC queue. +// +// For now, I believe that this is "ok" because shared channels are not the most +// common type, but soon we may wish to revisit this queue choice and determine +// another candidate for backend storage of shared channels. +// +// ## Overview of the Implementation +// +// Now that there's a little background on the concurrent queues used, it's +// worth going into much more detail about the channels themselves. The basic +// pseudocode for a send/recv are: +// +// +// send(t) recv() +// queue.push(t) return if queue.pop() +// if increment() == -1 deschedule { +// wakeup() if decrement() > 0 +// cancel_deschedule() +// } +// queue.pop() +// +// As mentioned before, there are no locks in this implementation, only atomic +// instructions are used. +// +// ### The internal atomic counter +// +// Every channel has a shared counter with each half to keep track of the size +// of the queue. This counter is used to abort descheduling by the receiver and +// to know when to wake up on the sending side. +// +// As seen in the pseudocode, senders will increment this count and receivers +// will decrement the count. The theory behind this is that if a sender sees a +// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count, +// then it doesn't need to block. +// +// The recv() method has a beginning call to pop(), and if successful, it needs +// to decrement the count. It is a crucial implementation detail that this +// decrement does *not* happen to the shared counter. If this were the case, +// then it would be possible for the counter to be very negative when there were +// no receivers waiting, in which case the senders would have to determine when +// it was actually appropriate to wake up a receiver. +// +// Instead, the "steal count" is kept track of separately (not atomically +// because it's only used by receivers), and then the decrement() call when +// descheduling will lump in all of the recent steals into one large decrement. +// +// The implication of this is that if a sender sees a -1 count, then there's +// guaranteed to be a waiter waiting! +// +// ## Native Implementation +// +// A major goal of these channels is to work seamlessly on and off the runtime. +// All of the previous race conditions have been worded in terms of +// scheduler-isms (which is obviously not available without the runtime). +// +// For now, native usage of channels (off the runtime) will fall back onto +// mutexes/cond vars for descheduling/atomic decisions. The no-contention path +// is still entirely lock-free, the "deschedule" blocks above are surrounded by +// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a +// condition variable. +// +// ## Select +// +// Being able to support selection over channels has greatly influenced this +// design, and not only does selection need to work inside the runtime, but also +// outside the runtime. +// +// The implementation is fairly straightforward. The goal of select() is not to +// return some data, but only to return which channel can receive data without +// blocking. The implementation is essentially the entire blocking procedure +// followed by an increment as soon as its woken up. The cancellation procedure +// involves an increment and swapping out of to_wake to acquire ownership of the +// thread to unblock. +// +// Sadly this current implementation requires multiple allocations, so I have +// seen the throughput of select() be much worse than it should be. I do not +// believe that there is anything fundamental that needs to change about these +// channels, however, in order to support a more efficient select(). +// +// FIXME: Select is now removed, so these factors are ready to be cleaned up! +// +// # Conclusion +// +// And now that you've seen all the races that I found and attempted to fix, +// here's the code for you to find some more! + +use crate::cell::UnsafeCell; +use crate::error; +use crate::fmt; +use crate::mem; +use crate::sync::Arc; +use crate::time::{Duration, Instant}; + +mod blocking; +mod mpsc_queue; +mod oneshot; +mod shared; +mod spsc_queue; +mod stream; +mod sync; + +mod cache_aligned; + +/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type. +/// This half can only be owned by one thread. +/// +/// Messages sent to the channel can be retrieved using [`recv`]. +/// +/// [`channel`]: fn.channel.html +/// [`sync_channel`]: fn.sync_channel.html +/// [`recv`]: struct.Receiver.html#method.recv +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// use std::time::Duration; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send("Hello world!").unwrap(); +/// thread::sleep(Duration::from_secs(2)); // block for two seconds +/// send.send("Delayed for 2 seconds").unwrap(); +/// }); +/// +/// println!("{}", recv.recv().unwrap()); // Received immediately +/// println!("Waiting..."); +/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct Receiver<T> { + inner: UnsafeCell<Flavor<T>>, +} + +// The receiver port can be sent from place to place, so long as it +// is not used to receive non-sendable things. +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl<T: Send> Send for Receiver<T> {} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> !Sync for Receiver<T> {} + +/// An iterator over messages on a [`Receiver`], created by [`iter`]. +/// +/// This iterator will block whenever [`next`] is called, +/// waiting for a new message, and [`None`] will be returned +/// when the corresponding channel has hung up. +/// +/// [`iter`]: struct.Receiver.html#method.iter +/// [`Receiver`]: struct.Receiver.html +/// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next +/// [`None`]: ../../../std/option/enum.Option.html#variant.None +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send(1u8).unwrap(); +/// send.send(2u8).unwrap(); +/// send.send(3u8).unwrap(); +/// }); +/// +/// for x in recv.iter() { +/// println!("Got: {}", x); +/// } +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +#[derive(Debug)] +pub struct Iter<'a, T: 'a> { + rx: &'a Receiver<T>, +} + +/// An iterator that attempts to yield all pending values for a [`Receiver`], +/// created by [`try_iter`]. +/// +/// [`None`] will be returned when there are no pending values remaining or +/// if the corresponding channel has hung up. +/// +/// This iterator will never block the caller in order to wait for data to +/// become available. Instead, it will return [`None`]. +/// +/// [`Receiver`]: struct.Receiver.html +/// [`try_iter`]: struct.Receiver.html#method.try_iter +/// [`None`]: ../../../std/option/enum.Option.html#variant.None +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// use std::time::Duration; +/// +/// let (sender, receiver) = channel(); +/// +/// // Nothing is in the buffer yet +/// assert!(receiver.try_iter().next().is_none()); +/// println!("Nothing in the buffer..."); +/// +/// thread::spawn(move || { +/// sender.send(1).unwrap(); +/// sender.send(2).unwrap(); +/// sender.send(3).unwrap(); +/// }); +/// +/// println!("Going to sleep..."); +/// thread::sleep(Duration::from_secs(2)); // block for two seconds +/// +/// for x in receiver.try_iter() { +/// println!("Got: {}", x); +/// } +/// ``` +#[stable(feature = "receiver_try_iter", since = "1.15.0")] +#[derive(Debug)] +pub struct TryIter<'a, T: 'a> { + rx: &'a Receiver<T>, +} + +/// An owning iterator over messages on a [`Receiver`], +/// created by **Receiver::into_iter**. +/// +/// This iterator will block whenever [`next`] +/// is called, waiting for a new message, and [`None`] will be +/// returned if the corresponding channel has hung up. +/// +/// [`Receiver`]: struct.Receiver.html +/// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next +/// [`None`]: ../../../std/option/enum.Option.html#variant.None +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send(1u8).unwrap(); +/// send.send(2u8).unwrap(); +/// send.send(3u8).unwrap(); +/// }); +/// +/// for x in recv.into_iter() { +/// println!("Got: {}", x); +/// } +/// ``` +#[stable(feature = "receiver_into_iter", since = "1.1.0")] +#[derive(Debug)] +pub struct IntoIter<T> { + rx: Receiver<T>, +} + +/// The sending-half of Rust's asynchronous [`channel`] type. This half can only be +/// owned by one thread, but it can be cloned to send to other threads. +/// +/// Messages can be sent through this channel with [`send`]. +/// +/// [`channel`]: fn.channel.html +/// [`send`]: struct.Sender.html#method.send +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (sender, receiver) = channel(); +/// let sender2 = sender.clone(); +/// +/// // First thread owns sender +/// thread::spawn(move || { +/// sender.send(1).unwrap(); +/// }); +/// +/// // Second thread owns sender2 +/// thread::spawn(move || { +/// sender2.send(2).unwrap(); +/// }); +/// +/// let msg = receiver.recv().unwrap(); +/// let msg2 = receiver.recv().unwrap(); +/// +/// assert_eq!(3, msg + msg2); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct Sender<T> { + inner: UnsafeCell<Flavor<T>>, +} + +// The send port can be sent from place to place, so long as it +// is not used to send non-sendable things. +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl<T: Send> Send for Sender<T> {} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> !Sync for Sender<T> {} + +/// The sending-half of Rust's synchronous [`sync_channel`] type. +/// +/// Messages can be sent through this channel with [`send`] or [`try_send`]. +/// +/// [`send`] will block if there is no space in the internal buffer. +/// +/// [`sync_channel`]: fn.sync_channel.html +/// [`send`]: struct.SyncSender.html#method.send +/// [`try_send`]: struct.SyncSender.html#method.try_send +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::sync_channel; +/// use std::thread; +/// +/// // Create a sync_channel with buffer size 2 +/// let (sync_sender, receiver) = sync_channel(2); +/// let sync_sender2 = sync_sender.clone(); +/// +/// // First thread owns sync_sender +/// thread::spawn(move || { +/// sync_sender.send(1).unwrap(); +/// sync_sender.send(2).unwrap(); +/// }); +/// +/// // Second thread owns sync_sender2 +/// thread::spawn(move || { +/// sync_sender2.send(3).unwrap(); +/// // thread will now block since the buffer is full +/// println!("Thread unblocked!"); +/// }); +/// +/// let mut msg; +/// +/// msg = receiver.recv().unwrap(); +/// println!("message {} received", msg); +/// +/// // "Thread unblocked!" will be printed now +/// +/// msg = receiver.recv().unwrap(); +/// println!("message {} received", msg); +/// +/// msg = receiver.recv().unwrap(); +/// +/// println!("message {} received", msg); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct SyncSender<T> { + inner: Arc<sync::Packet<T>>, +} + +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl<T: Send> Send for SyncSender<T> {} + +/// An error returned from the [`Sender::send`] or [`SyncSender::send`] +/// function on **channel**s. +/// +/// A **send** operation can only fail if the receiving end of a channel is +/// disconnected, implying that the data could never be received. The error +/// contains the data being sent as a payload so it can be recovered. +/// +/// [`Sender::send`]: struct.Sender.html#method.send +/// [`SyncSender::send`]: struct.SyncSender.html#method.send +#[stable(feature = "rust1", since = "1.0.0")] +#[derive(PartialEq, Eq, Clone, Copy)] +pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T); + +/// An error returned from the [`recv`] function on a [`Receiver`]. +/// +/// The [`recv`] operation can only fail if the sending half of a +/// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further +/// messages will ever be received. +/// +/// [`recv`]: struct.Receiver.html#method.recv +/// [`Receiver`]: struct.Receiver.html +/// [`channel`]: fn.channel.html +/// [`sync_channel`]: fn.sync_channel.html +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[stable(feature = "rust1", since = "1.0.0")] +pub struct RecvError; + +/// This enumeration is the list of the possible reasons that [`try_recv`] could +/// not return data when called. This can occur with both a [`channel`] and +/// a [`sync_channel`]. +/// +/// [`try_recv`]: struct.Receiver.html#method.try_recv +/// [`channel`]: fn.channel.html +/// [`sync_channel`]: fn.sync_channel.html +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[stable(feature = "rust1", since = "1.0.0")] +pub enum TryRecvError { + /// This **channel** is currently empty, but the **Sender**(s) have not yet + /// disconnected, so data may yet become available. + #[stable(feature = "rust1", since = "1.0.0")] + Empty, + + /// The **channel**'s sending half has become disconnected, and there will + /// never be any more data received on it. + #[stable(feature = "rust1", since = "1.0.0")] + Disconnected, +} + +/// This enumeration is the list of possible errors that made [`recv_timeout`] +/// unable to return data when called. This can occur with both a [`channel`] and +/// a [`sync_channel`]. +/// +/// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout +/// [`channel`]: fn.channel.html +/// [`sync_channel`]: fn.sync_channel.html +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] +pub enum RecvTimeoutError { + /// This **channel** is currently empty, but the **Sender**(s) have not yet + /// disconnected, so data may yet become available. + #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] + Timeout, + /// The **channel**'s sending half has become disconnected, and there will + /// never be any more data received on it. + #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] + Disconnected, +} + +/// This enumeration is the list of the possible error outcomes for the +/// [`try_send`] method. +/// +/// [`try_send`]: struct.SyncSender.html#method.try_send +#[stable(feature = "rust1", since = "1.0.0")] +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum TrySendError<T> { + /// The data could not be sent on the [`sync_channel`] because it would require that + /// the callee block to send the data. + /// + /// If this is a buffered channel, then the buffer is full at this time. If + /// this is not a buffered channel, then there is no [`Receiver`] available to + /// acquire the data. + /// + /// [`sync_channel`]: fn.sync_channel.html + /// [`Receiver`]: struct.Receiver.html + #[stable(feature = "rust1", since = "1.0.0")] + Full(#[stable(feature = "rust1", since = "1.0.0")] T), + + /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be + /// sent. The data is returned back to the callee in this case. + /// + /// [`sync_channel`]: fn.sync_channel.html + #[stable(feature = "rust1", since = "1.0.0")] + Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T), +} + +enum Flavor<T> { + Oneshot(Arc<oneshot::Packet<T>>), + Stream(Arc<stream::Packet<T>>), + Shared(Arc<shared::Packet<T>>), + Sync(Arc<sync::Packet<T>>), +} + +#[doc(hidden)] +trait UnsafeFlavor<T> { + fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>; + unsafe fn inner_mut(&self) -> &mut Flavor<T> { + &mut *self.inner_unsafe().get() + } + unsafe fn inner(&self) -> &Flavor<T> { + &*self.inner_unsafe().get() + } +} +impl<T> UnsafeFlavor<T> for Sender<T> { + fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> { + &self.inner + } +} +impl<T> UnsafeFlavor<T> for Receiver<T> { + fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> { + &self.inner + } +} + +/// Creates a new asynchronous channel, returning the sender/receiver halves. +/// All data sent on the [`Sender`] will become available on the [`Receiver`] in +/// the same order as it was sent, and no [`send`] will block the calling thread +/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will +/// block after its buffer limit is reached). [`recv`] will block until a message +/// is available. +/// +/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but +/// only one [`Receiver`] is supported. +/// +/// If the [`Receiver`] is disconnected while trying to [`send`] with the +/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the +/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will +/// return a [`RecvError`]. +/// +/// [`send`]: struct.Sender.html#method.send +/// [`recv`]: struct.Receiver.html#method.recv +/// [`Sender`]: struct.Sender.html +/// [`Receiver`]: struct.Receiver.html +/// [`sync_channel`]: fn.sync_channel.html +/// [`SendError`]: struct.SendError.html +/// [`RecvError`]: struct.RecvError.html +/// +/// # Examples +/// +/// ``` +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (sender, receiver) = channel(); +/// +/// // Spawn off an expensive computation +/// thread::spawn(move|| { +/// # fn expensive_computation() {} +/// sender.send(expensive_computation()).unwrap(); +/// }); +/// +/// // Do some useful work for awhile +/// +/// // Let's see what that answer was +/// println!("{:?}", receiver.recv().unwrap()); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let a = Arc::new(oneshot::Packet::new()); + (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) +} + +/// Creates a new synchronous, bounded channel. +/// All data sent on the [`SyncSender`] will become available on the [`Receiver`] +/// in the same order as it was sent. Like asynchronous [`channel`]s, the +/// [`Receiver`] will block until a message becomes available. `sync_channel` +/// differs greatly in the semantics of the sender, however. +/// +/// This channel has an internal buffer on which messages will be queued. +/// `bound` specifies the buffer size. When the internal buffer becomes full, +/// future sends will *block* waiting for the buffer to open up. Note that a +/// buffer size of 0 is valid, in which case this becomes "rendezvous channel" +/// where each [`send`] will not return until a [`recv`] is paired with it. +/// +/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple +/// times, but only one [`Receiver`] is supported. +/// +/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying +/// to [`send`] with the [`SyncSender`], the [`send`] method will return a +/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying +/// to [`recv`], the [`recv`] method will return a [`RecvError`]. +/// +/// [`channel`]: fn.channel.html +/// [`send`]: struct.SyncSender.html#method.send +/// [`recv`]: struct.Receiver.html#method.recv +/// [`SyncSender`]: struct.SyncSender.html +/// [`Receiver`]: struct.Receiver.html +/// [`SendError`]: struct.SendError.html +/// [`RecvError`]: struct.RecvError.html +/// +/// # Examples +/// +/// ``` +/// use std::sync::mpsc::sync_channel; +/// use std::thread; +/// +/// let (sender, receiver) = sync_channel(1); +/// +/// // this returns immediately +/// sender.send(1).unwrap(); +/// +/// thread::spawn(move|| { +/// // this will block until the previous message has been received +/// sender.send(2).unwrap(); +/// }); +/// +/// assert_eq!(receiver.recv().unwrap(), 1); +/// assert_eq!(receiver.recv().unwrap(), 2); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { + let a = Arc::new(sync::Packet::new(bound)); + (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a))) +} + +//////////////////////////////////////////////////////////////////////////////// +// Sender +//////////////////////////////////////////////////////////////////////////////// + +impl<T> Sender<T> { + fn new(inner: Flavor<T>) -> Sender<T> { + Sender { inner: UnsafeCell::new(inner) } + } + + /// Attempts to send a value on this channel, returning it back if it could + /// not be sent. + /// + /// A successful send occurs when it is determined that the other end of + /// the channel has not hung up already. An unsuccessful send would be one + /// where the corresponding receiver has already been deallocated. Note + /// that a return value of [`Err`] means that the data will never be + /// received, but a return value of [`Ok`] does *not* mean that the data + /// will be received. It is possible for the corresponding receiver to + /// hang up immediately after this function returns [`Ok`]. + /// + /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err + /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok + /// + /// This method will never block the current thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::mpsc::channel; + /// + /// let (tx, rx) = channel(); + /// + /// // This send is always successful + /// tx.send(1).unwrap(); + /// + /// // This send will fail because the receiver is gone + /// drop(rx); + /// assert_eq!(tx.send(1).unwrap_err().0, 1); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + let (new_inner, ret) = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => { + if !p.sent() { + return p.send(t).map_err(SendError); + } else { + let a = Arc::new(stream::Packet::new()); + let rx = Receiver::new(Flavor::Stream(a.clone())); + match p.upgrade(rx) { + oneshot::UpSuccess => { + let ret = a.send(t); + (a, ret) + } + oneshot::UpDisconnected => (a, Err(t)), + oneshot::UpWoke(token) => { + // This send cannot panic because the thread is + // asleep (we're looking at it), so the receiver + // can't go away. + a.send(t).ok().unwrap(); + token.signal(); + (a, Ok(())) + } + } + } + } + Flavor::Stream(ref p) => return p.send(t).map_err(SendError), + Flavor::Shared(ref p) => return p.send(t).map_err(SendError), + Flavor::Sync(..) => unreachable!(), + }; + + unsafe { + let tmp = Sender::new(Flavor::Stream(new_inner)); + mem::swap(self.inner_mut(), tmp.inner_mut()); + } + ret.map_err(SendError) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { + let packet = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => { + let a = Arc::new(shared::Packet::new()); + { + let guard = a.postinit_lock(); + let rx = Receiver::new(Flavor::Shared(a.clone())); + let sleeper = match p.upgrade(rx) { + oneshot::UpSuccess | oneshot::UpDisconnected => None, + oneshot::UpWoke(task) => Some(task), + }; + a.inherit_blocker(sleeper, guard); + } + a + } + Flavor::Stream(ref p) => { + let a = Arc::new(shared::Packet::new()); + { + let guard = a.postinit_lock(); + let rx = Receiver::new(Flavor::Shared(a.clone())); + let sleeper = match p.upgrade(rx) { + stream::UpSuccess | stream::UpDisconnected => None, + stream::UpWoke(task) => Some(task), + }; + a.inherit_blocker(sleeper, guard); + } + a + } + Flavor::Shared(ref p) => { + p.clone_chan(); + return Sender::new(Flavor::Shared(p.clone())); + } + Flavor::Sync(..) => unreachable!(), + }; + + unsafe { + let tmp = Sender::new(Flavor::Shared(packet.clone())); + mem::swap(self.inner_mut(), tmp.inner_mut()); + } + Sender::new(Flavor::Shared(packet)) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> Drop for Sender<T> { + fn drop(&mut self) { + match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => p.drop_chan(), + Flavor::Stream(ref p) => p.drop_chan(), + Flavor::Shared(ref p) => p.drop_chan(), + Flavor::Sync(..) => unreachable!(), + } + } +} + +#[stable(feature = "mpsc_debug", since = "1.8.0")] +impl<T> fmt::Debug for Sender<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Sender").finish() + } +} + +//////////////////////////////////////////////////////////////////////////////// +// SyncSender +//////////////////////////////////////////////////////////////////////////////// + +impl<T> SyncSender<T> { + fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> { + SyncSender { inner } + } + + /// Sends a value on this synchronous channel. + /// + /// This function will *block* until space in the internal buffer becomes + /// available or a receiver is available to hand off the message to. + /// + /// Note that a successful send does *not* guarantee that the receiver will + /// ever see the data if there is a buffer on this channel. Items may be + /// enqueued in the internal buffer for the receiver to receive at a later + /// time. If the buffer size is 0, however, the channel becomes a rendezvous + /// channel and it guarantees that the receiver has indeed received + /// the data if this function returns success. + /// + /// This function will never panic, but it may return [`Err`] if the + /// [`Receiver`] has disconnected and is no longer able to receive + /// information. + /// + /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err + /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::sync_channel; + /// use std::thread; + /// + /// // Create a rendezvous sync_channel with buffer size 0 + /// let (sync_sender, receiver) = sync_channel(0); + /// + /// thread::spawn(move || { + /// println!("sending message..."); + /// sync_sender.send(1).unwrap(); + /// // Thread is now blocked until the message is received + /// + /// println!("...message received!"); + /// }); + /// + /// let msg = receiver.recv().unwrap(); + /// assert_eq!(1, msg); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.inner.send(t).map_err(SendError) + } + + /// Attempts to send a value on this channel without blocking. + /// + /// This method differs from [`send`] by returning immediately if the + /// channel's buffer is full or no receiver is waiting to acquire some + /// data. Compared with [`send`], this function has two failure cases + /// instead of one (one for disconnection, one for a full buffer). + /// + /// See [`send`] for notes about guarantees of whether the + /// receiver has received the data or not if this function is successful. + /// + /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::sync_channel; + /// use std::thread; + /// + /// // Create a sync_channel with buffer size 1 + /// let (sync_sender, receiver) = sync_channel(1); + /// let sync_sender2 = sync_sender.clone(); + /// + /// // First thread owns sync_sender + /// thread::spawn(move || { + /// sync_sender.send(1).unwrap(); + /// sync_sender.send(2).unwrap(); + /// // Thread blocked + /// }); + /// + /// // Second thread owns sync_sender2 + /// thread::spawn(move || { + /// // This will return an error and send + /// // no message if the buffer is full + /// let _ = sync_sender2.try_send(3); + /// }); + /// + /// let mut msg; + /// msg = receiver.recv().unwrap(); + /// println!("message {} received", msg); + /// + /// msg = receiver.recv().unwrap(); + /// println!("message {} received", msg); + /// + /// // Third message may have never been sent + /// match receiver.try_recv() { + /// Ok(msg) => println!("message {} received", msg), + /// Err(_) => println!("the third message was never sent"), + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { + self.inner.try_send(t) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> Clone for SyncSender<T> { + fn clone(&self) -> SyncSender<T> { + self.inner.clone_chan(); + SyncSender::new(self.inner.clone()) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> Drop for SyncSender<T> { + fn drop(&mut self) { + self.inner.drop_chan(); + } +} + +#[stable(feature = "mpsc_debug", since = "1.8.0")] +impl<T> fmt::Debug for SyncSender<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SyncSender").finish() + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Receiver +//////////////////////////////////////////////////////////////////////////////// + +impl<T> Receiver<T> { + fn new(inner: Flavor<T>) -> Receiver<T> { + Receiver { inner: UnsafeCell::new(inner) } + } + + /// Attempts to return a pending value on this receiver without blocking. + /// + /// This method will never block the caller in order to wait for data to + /// become available. Instead, this will always return immediately with a + /// possible option of pending data on the channel. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// block on a receiver. + /// + /// Compared with [`recv`], this function has two failure cases instead of one + /// (one for disconnection, one for an empty buffer). + /// + /// [`recv`]: struct.Receiver.html#method.recv + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::{Receiver, channel}; + /// + /// let (_, receiver): (_, Receiver<i32>) = channel(); + /// + /// assert!(receiver.try_recv().is_err()); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn try_recv(&self) -> Result<T, TryRecvError> { + loop { + let new_port = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(oneshot::Empty) => return Err(TryRecvError::Empty), + Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected), + Err(oneshot::Upgraded(rx)) => rx, + }, + Flavor::Stream(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(stream::Empty) => return Err(TryRecvError::Empty), + Err(stream::Disconnected) => return Err(TryRecvError::Disconnected), + Err(stream::Upgraded(rx)) => rx, + }, + Flavor::Shared(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(shared::Empty) => return Err(TryRecvError::Empty), + Err(shared::Disconnected) => return Err(TryRecvError::Disconnected), + }, + Flavor::Sync(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(sync::Empty) => return Err(TryRecvError::Empty), + Err(sync::Disconnected) => return Err(TryRecvError::Disconnected), + }, + }; + unsafe { + mem::swap(self.inner_mut(), new_port.inner_mut()); + } + } + } + + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent. Once a message is + /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this + /// receiver will wake up and return that message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// [`Sender`]: struct.Sender.html + /// [`SyncSender`]: struct.SyncSender.html + /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err + /// + /// # Examples + /// + /// ``` + /// use std::sync::mpsc; + /// use std::thread; + /// + /// let (send, recv) = mpsc::channel(); + /// let handle = thread::spawn(move || { + /// send.send(1u8).unwrap(); + /// }); + /// + /// handle.join().unwrap(); + /// + /// assert_eq!(Ok(1), recv.recv()); + /// ``` + /// + /// Buffering behavior: + /// + /// ``` + /// use std::sync::mpsc; + /// use std::thread; + /// use std::sync::mpsc::RecvError; + /// + /// let (send, recv) = mpsc::channel(); + /// let handle = thread::spawn(move || { + /// send.send(1u8).unwrap(); + /// send.send(2).unwrap(); + /// send.send(3).unwrap(); + /// drop(send); + /// }); + /// + /// // wait for the thread to join so we ensure the sender is dropped + /// handle.join().unwrap(); + /// + /// assert_eq!(Ok(1), recv.recv()); + /// assert_eq!(Ok(2), recv.recv()); + /// assert_eq!(Ok(3), recv.recv()); + /// assert_eq!(Err(RecvError), recv.recv()); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn recv(&self) -> Result<T, RecvError> { + loop { + let new_port = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => match p.recv(None) { + Ok(t) => return Ok(t), + Err(oneshot::Disconnected) => return Err(RecvError), + Err(oneshot::Upgraded(rx)) => rx, + Err(oneshot::Empty) => unreachable!(), + }, + Flavor::Stream(ref p) => match p.recv(None) { + Ok(t) => return Ok(t), + Err(stream::Disconnected) => return Err(RecvError), + Err(stream::Upgraded(rx)) => rx, + Err(stream::Empty) => unreachable!(), + }, + Flavor::Shared(ref p) => match p.recv(None) { + Ok(t) => return Ok(t), + Err(shared::Disconnected) => return Err(RecvError), + Err(shared::Empty) => unreachable!(), + }, + Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError), + }; + unsafe { + mem::swap(self.inner_mut(), new_port.inner_mut()); + } + } + } + + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up, or if it waits more than `timeout`. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent. Once a message is + /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this + /// receiver will wake up and return that message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// [`Sender`]: struct.Sender.html + /// [`SyncSender`]: struct.SyncSender.html + /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err + /// + /// # Known Issues + /// + /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout` + /// to panic unexpectedly with the following example: + /// + /// ```no_run + /// use std::sync::mpsc::channel; + /// use std::thread; + /// use std::time::Duration; + /// + /// let (tx, rx) = channel::<String>(); + /// + /// thread::spawn(move || { + /// let d = Duration::from_millis(10); + /// loop { + /// println!("recv"); + /// let _r = rx.recv_timeout(d); + /// } + /// }); + /// + /// thread::sleep(Duration::from_millis(100)); + /// let _c1 = tx.clone(); + /// + /// thread::sleep(Duration::from_secs(1)); + /// ``` + /// + /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364 + /// + /// # Examples + /// + /// Successfully receiving value before encountering timeout: + /// + /// ```no_run + /// use std::thread; + /// use std::time::Duration; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_timeout(Duration::from_millis(400)), + /// Ok('a') + /// ); + /// ``` + /// + /// Receiving an error upon reaching timeout: + /// + /// ```no_run + /// use std::thread; + /// use std::time::Duration; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(800)); + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_timeout(Duration::from_millis(400)), + /// Err(mpsc::RecvTimeoutError::Timeout) + /// ); + /// ``` + #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] + pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { + // Do an optimistic try_recv to avoid the performance impact of + // Instant::now() in the full-channel case. + match self.try_recv() { + Ok(result) => Ok(result), + Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected), + Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) { + Some(deadline) => self.recv_deadline(deadline), + // So far in the future that it's practically the same as waiting indefinitely. + None => self.recv().map_err(RecvTimeoutError::from), + }, + } + } + + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up, or if `deadline` is reached. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent. Once a message is + /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this + /// receiver will wake up and return that message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// [`Sender`]: struct.Sender.html + /// [`SyncSender`]: struct.SyncSender.html + /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err + /// + /// # Examples + /// + /// Successfully receiving value before reaching deadline: + /// + /// ```no_run + /// #![feature(deadline_api)] + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), + /// Ok('a') + /// ); + /// ``` + /// + /// Receiving an error upon reaching deadline: + /// + /// ```no_run + /// #![feature(deadline_api)] + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(800)); + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), + /// Err(mpsc::RecvTimeoutError::Timeout) + /// ); + /// ``` + #[unstable(feature = "deadline_api", issue = "46316")] + pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { + use self::RecvTimeoutError::*; + + loop { + let port_or_empty = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(oneshot::Disconnected) => return Err(Disconnected), + Err(oneshot::Upgraded(rx)) => Some(rx), + Err(oneshot::Empty) => None, + }, + Flavor::Stream(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(stream::Disconnected) => return Err(Disconnected), + Err(stream::Upgraded(rx)) => Some(rx), + Err(stream::Empty) => None, + }, + Flavor::Shared(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(shared::Disconnected) => return Err(Disconnected), + Err(shared::Empty) => None, + }, + Flavor::Sync(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(sync::Disconnected) => return Err(Disconnected), + Err(sync::Empty) => None, + }, + }; + + if let Some(new_port) = port_or_empty { + unsafe { + mem::swap(self.inner_mut(), new_port.inner_mut()); + } + } + + // If we're already passed the deadline, and we're here without + // data, return a timeout, else try again. + if Instant::now() >= deadline { + return Err(Timeout); + } + } + } + + /// Returns an iterator that will block waiting for messages, but never + /// [`panic!`]. It will return [`None`] when the channel has hung up. + /// + /// [`panic!`]: ../../../std/macro.panic.html + /// [`None`]: ../../../std/option/enum.Option.html#variant.None + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::channel; + /// use std::thread; + /// + /// let (send, recv) = channel(); + /// + /// thread::spawn(move || { + /// send.send(1).unwrap(); + /// send.send(2).unwrap(); + /// send.send(3).unwrap(); + /// }); + /// + /// let mut iter = recv.iter(); + /// assert_eq!(iter.next(), Some(1)); + /// assert_eq!(iter.next(), Some(2)); + /// assert_eq!(iter.next(), Some(3)); + /// assert_eq!(iter.next(), None); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn iter(&self) -> Iter<'_, T> { + Iter { rx: self } + } + + /// Returns an iterator that will attempt to yield all pending values. + /// It will return `None` if there are no more pending values or if the + /// channel has hung up. The iterator will never [`panic!`] or block the + /// user by waiting for values. + /// + /// [`panic!`]: ../../../std/macro.panic.html + /// + /// # Examples + /// + /// ```no_run + /// use std::sync::mpsc::channel; + /// use std::thread; + /// use std::time::Duration; + /// + /// let (sender, receiver) = channel(); + /// + /// // nothing is in the buffer yet + /// assert!(receiver.try_iter().next().is_none()); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// sender.send(1).unwrap(); + /// sender.send(2).unwrap(); + /// sender.send(3).unwrap(); + /// }); + /// + /// // nothing is in the buffer yet + /// assert!(receiver.try_iter().next().is_none()); + /// + /// // block for two seconds + /// thread::sleep(Duration::from_secs(2)); + /// + /// let mut iter = receiver.try_iter(); + /// assert_eq!(iter.next(), Some(1)); + /// assert_eq!(iter.next(), Some(2)); + /// assert_eq!(iter.next(), Some(3)); + /// assert_eq!(iter.next(), None); + /// ``` + #[stable(feature = "receiver_try_iter", since = "1.15.0")] + pub fn try_iter(&self) -> TryIter<'_, T> { + TryIter { rx: self } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<'a, T> Iterator for Iter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.rx.recv().ok() + } +} + +#[stable(feature = "receiver_try_iter", since = "1.15.0")] +impl<'a, T> Iterator for TryIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.rx.try_recv().ok() + } +} + +#[stable(feature = "receiver_into_iter", since = "1.1.0")] +impl<'a, T> IntoIterator for &'a Receiver<T> { + type Item = T; + type IntoIter = Iter<'a, T>; + + fn into_iter(self) -> Iter<'a, T> { + self.iter() + } +} + +#[stable(feature = "receiver_into_iter", since = "1.1.0")] +impl<T> Iterator for IntoIter<T> { + type Item = T; + fn next(&mut self) -> Option<T> { + self.rx.recv().ok() + } +} + +#[stable(feature = "receiver_into_iter", since = "1.1.0")] +impl<T> IntoIterator for Receiver<T> { + type Item = T; + type IntoIter = IntoIter<T>; + + fn into_iter(self) -> IntoIter<T> { + IntoIter { rx: self } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => p.drop_port(), + Flavor::Stream(ref p) => p.drop_port(), + Flavor::Shared(ref p) => p.drop_port(), + Flavor::Sync(ref p) => p.drop_port(), + } + } +} + +#[stable(feature = "mpsc_debug", since = "1.8.0")] +impl<T> fmt::Debug for Receiver<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Receiver").finish() + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> fmt::Debug for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "SendError(..)".fmt(f) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "sending on a closed channel".fmt(f) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: Send> error::Error for SendError<T> { + #[allow(deprecated)] + fn description(&self) -> &str { + "sending on a closed channel" + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> fmt::Debug for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TrySendError::Full(..) => "Full(..)".fmt(f), + TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> fmt::Display for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TrySendError::Full(..) => "sending on a full channel".fmt(f), + TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: Send> error::Error for TrySendError<T> { + #[allow(deprecated)] + fn description(&self) -> &str { + match *self { + TrySendError::Full(..) => "sending on a full channel", + TrySendError::Disconnected(..) => "sending on a closed channel", + } + } +} + +#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] +impl<T> From<SendError<T>> for TrySendError<T> { + fn from(err: SendError<T>) -> TrySendError<T> { + match err { + SendError(t) => TrySendError::Disconnected(t), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "receiving on a closed channel".fmt(f) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl error::Error for RecvError { + #[allow(deprecated)] + fn description(&self) -> &str { + "receiving on a closed channel" + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TryRecvError::Empty => "receiving on an empty channel".fmt(f), + TryRecvError::Disconnected => "receiving on a closed channel".fmt(f), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl error::Error for TryRecvError { + #[allow(deprecated)] + fn description(&self) -> &str { + match *self { + TryRecvError::Empty => "receiving on an empty channel", + TryRecvError::Disconnected => "receiving on a closed channel", + } + } +} + +#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] +impl From<RecvError> for TryRecvError { + fn from(err: RecvError) -> TryRecvError { + match err { + RecvError => TryRecvError::Disconnected, + } + } +} + +#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] +impl fmt::Display for RecvTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f), + RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f), + } + } +} + +#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] +impl error::Error for RecvTimeoutError { + #[allow(deprecated)] + fn description(&self) -> &str { + match *self { + RecvTimeoutError::Timeout => "timed out waiting on channel", + RecvTimeoutError::Disconnected => "channel is empty and sending half is closed", + } + } +} + +#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] +impl From<RecvError> for RecvTimeoutError { + fn from(err: RecvError) -> RecvTimeoutError { + match err { + RecvError => RecvTimeoutError::Disconnected, + } + } +} + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests { + use super::*; + use crate::env; + use crate::thread; + use crate::time::{Duration, Instant}; + + pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } + } + + #[test] + fn smoke() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn drop_full() { + let (tx, _rx) = channel::<Box<isize>>(); + tx.send(box 1).unwrap(); + } + + #[test] + fn drop_full_shared() { + let (tx, _rx) = channel::<Box<isize>>(); + drop(tx.clone()); + drop(tx.clone()); + tx.send(box 1).unwrap(); + } + + #[test] + fn smoke_shared() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn smoke_threads() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn smoke_port_gone() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(1).is_err()); + } + + #[test] + fn smoke_shared_port_gone() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(1).is_err()) + } + + #[test] + fn smoke_shared_port_gone2() { + let (tx, rx) = channel::<i32>(); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); + } + + #[test] + fn port_gone_concurrent() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} + } + + #[test] + fn port_gone_concurrent_shared() { + let (tx, rx) = channel::<i32>(); + let tx2 = tx.clone(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} + } + + #[test] + fn smoke_chan_gone() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn smoke_chan_gone_shared() { + let (tx, rx) = channel::<()>(); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); + } + + #[test] + fn chan_gone_concurrent() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} + } + + #[test] + fn stress() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + for _ in 0..10000 { + assert_eq!(rx.recv().unwrap(), 1); + } + t.join().ok().expect("thread panicked"); + } + + #[test] + fn stress_shared() { + const AMT: u32 = 10000; + const NTHREADS: u32 = 8; + let (tx, rx) = channel::<i32>(); + + let t = thread::spawn(move || { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + } + drop(tx); + t.join().ok().expect("thread panicked"); + } + + #[test] + fn send_from_outside_runtime() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<i32>(); + let t1 = thread::spawn(move || { + tx1.send(()).unwrap(); + for _ in 0..40 { + assert_eq!(rx2.recv().unwrap(), 1); + } + }); + rx1.recv().unwrap(); + let t2 = thread::spawn(move || { + for _ in 0..40 { + tx2.send(1).unwrap(); + } + }); + t1.join().ok().expect("thread panicked"); + t2.join().ok().expect("thread panicked"); + } + + #[test] + fn recv_from_outside_runtime() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + for _ in 0..40 { + assert_eq!(rx.recv().unwrap(), 1); + } + }); + for _ in 0..40 { + tx.send(1).unwrap(); + } + t.join().ok().expect("thread panicked"); + } + + #[test] + fn no_runtime() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + let t1 = thread::spawn(move || { + assert_eq!(rx1.recv().unwrap(), 1); + tx2.send(2).unwrap(); + }); + let t2 = thread::spawn(move || { + tx1.send(1).unwrap(); + assert_eq!(rx2.recv().unwrap(), 2); + }); + t1.join().ok().expect("thread panicked"); + t2.join().ok().expect("thread panicked"); + } + + #[test] + fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = channel::<i32>(); + drop(rx); + } + + #[test] + fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = channel::<i32>(); + drop(tx); + } + + #[test] + fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = channel::<Box<i32>>(); + drop(rx); + assert!(tx.send(box 0).is_err()); + } + + #[test] + fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will panic + let res = thread::spawn(move || { + let (tx, rx) = channel::<i32>(); + drop(tx); + rx.recv().unwrap(); + }) + .join(); + // What is our res? + assert!(res.is_err()); + } + + #[test] + fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = channel::<Box<i32>>(); + tx.send(box 10).unwrap(); + assert!(*rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_open() { + let (tx, rx) = channel::<i32>(); + assert!(tx.send(10).is_ok()); + assert!(rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(10).is_err()); + } + + #[test] + fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = channel::<i32>(); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); + } + + #[test] + fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn oneshot_single_thread_peek_data() { + let (tx, rx) = channel::<i32>(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); + } + + #[test] + fn oneshot_single_thread_peek_close() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_open() { + let (_tx, rx) = channel::<i32>(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[test] + fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = channel::<Box<i32>>(); + let _t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(box 10).unwrap(); + } + + #[test] + fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = channel::<Box<i32>>(); + let _t = thread::spawn(move || { + drop(tx); + }); + let res = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }) + .join(); + assert!(res.is_err()); + } + + #[test] + fn oneshot_multi_thread_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + drop(rx); + }); + drop(tx); + } + } + + #[test] + fn oneshot_multi_thread_send_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + drop(rx); + }); + let _ = thread::spawn(move || { + tx.send(1).unwrap(); + }) + .join(); + } + } + + #[test] + fn oneshot_multi_thread_recv_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::<i32>(); + thread::spawn(move || { + let res = thread::spawn(move || { + rx.recv().unwrap(); + }) + .join(); + assert!(res.is_err()); + }); + let _t = thread::spawn(move || { + thread::spawn(move || { + drop(tx); + }); + }); + } + } + + #[test] + fn oneshot_multi_thread_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::<Box<isize>>(); + let _t = thread::spawn(move || { + tx.send(box 10).unwrap(); + }); + assert!(*rx.recv().unwrap() == 10); + } + } + + #[test] + fn stream_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel(); + + send(tx, 0); + recv(rx, 0); + + fn send(tx: Sender<Box<i32>>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + tx.send(box i).unwrap(); + send(tx, i + 1); + }); + } + + fn recv(rx: Receiver<Box<i32>>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + }); + } + } + } + + #[test] + fn oneshot_single_thread_recv_timeout() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + } + + #[test] + fn stress_recv_timeout_two_threads() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + let timeout = Duration::from_millis(100); + + thread::spawn(move || { + for i in 0..stress { + if i % 2 == 0 { + thread::sleep(timeout * 2); + } + tx.send(1usize).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(timeout) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); + } + + #[test] + fn recv_timeout_upgrade() { + let (tx, rx) = channel::<()>(); + let timeout = Duration::from_millis(1); + let _tx_clone = tx.clone(); + + let start = Instant::now(); + assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); + assert!(Instant::now() >= start + timeout); + } + + #[test] + fn stress_recv_timeout_shared() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + + for i in 0..stress { + let tx = tx.clone(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(i as u64 * 10)); + tx.send(1usize).unwrap(); + }); + } + + drop(tx); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); + } + + #[test] + fn very_long_recv_timeout_wont_panic() { + let (tx, rx) = channel::<()>(); + let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX))); + thread::sleep(Duration::from_secs(1)); + assert!(tx.send(()).is_ok()); + assert_eq!(join_handle.join().unwrap(), Ok(())); + } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = channel(); + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } + } + + #[test] + fn shared_recv_timeout() { + let (tx, rx) = channel(); + let total = 5; + for _ in 0..total { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(()).unwrap(); + }); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + } + + #[test] + fn shared_chan_stress() { + let (tx, rx) = channel(); + let total = stress_factor() + 100; + for _ in 0..total { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(()).unwrap(); + }); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + } + + #[test] + fn test_nested_recv_iter() { + let (tx, rx) = channel::<i32>(); + let (total_tx, total_rx) = channel::<i32>(); + + let _t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); + } + + #[test] + fn test_recv_iter_break() { + let (tx, rx) = channel::<i32>(); + let (count_tx, count_rx) = channel(); + + let _t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); + } + + #[test] + fn test_recv_try_iter() { + let (request_tx, request_rx) = channel(); + let (response_tx, response_rx) = channel(); + + // Request `x`s until we have `6`. + let t = thread::spawn(move || { + let mut count = 0; + loop { + for x in response_rx.try_iter() { + count += x; + if count == 6 { + return count; + } + } + request_tx.send(()).unwrap(); + } + }); + + for _ in request_rx.iter() { + if response_tx.send(2).is_err() { + break; + } + } + + assert_eq!(t.join().unwrap(), 6); + } + + #[test] + fn test_recv_into_iter_owned() { + let mut iter = { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + + rx.into_iter() + }; + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); + } + + #[test] + fn test_recv_into_iter_borrowed() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + let mut iter = (&rx).into_iter(); + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); + } + + #[test] + fn try_recv_states() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + let _t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); + } + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + #[test] + fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let t = tx.clone(); + drop(tx); + t.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); + } + + #[test] + fn issue_32114() { + let (tx, _) = channel(); + let _ = tx.send(123); + assert_eq!(tx.send(123), Err(SendError(123))); + } +} + +#[cfg(all(test, not(target_os = "emscripten")))] +mod sync_tests { + use super::*; + use crate::env; + use crate::thread; + use crate::time::Duration; + + pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } + } + + #[test] + fn smoke() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn drop_full() { + let (tx, _rx) = sync_channel::<Box<isize>>(1); + tx.send(box 1).unwrap(); + } + + #[test] + fn smoke_shared() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn recv_timeout() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); + tx.send(1).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1)); + } + + #[test] + fn smoke_threads() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn smoke_port_gone() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + assert!(tx.send(1).is_err()); + } + + #[test] + fn smoke_shared_port_gone2() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); + } + + #[test] + fn port_gone_concurrent() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} + } + + #[test] + fn port_gone_concurrent_shared() { + let (tx, rx) = sync_channel::<i32>(0); + let tx2 = tx.clone(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} + } + + #[test] + fn smoke_chan_gone() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn smoke_chan_gone_shared() { + let (tx, rx) = sync_channel::<()>(0); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); + } + + #[test] + fn chan_gone_concurrent() { + let (tx, rx) = sync_channel::<i32>(0); + thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} + } + + #[test] + fn stress() { + let (tx, rx) = sync_channel::<i32>(0); + thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + for _ in 0..10000 { + assert_eq!(rx.recv().unwrap(), 1); + } + } + + #[test] + fn stress_recv_timeout_two_threads() { + let (tx, rx) = sync_channel::<i32>(0); + + thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(1)) { + Ok(v) => { + assert_eq!(v, 1); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, 10000); + } + + #[test] + fn stress_recv_timeout_shared() { + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + let (tx, rx) = sync_channel::<i32>(0); + let (dtx, drx) = sync_channel::<()>(0); + + thread::spawn(move || { + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(v) => { + assert_eq!(v, 1); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, AMT * NTHREADS); + assert!(rx.try_recv().is_err()); + + dtx.send(()).unwrap(); + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + } + + drop(tx); + + drx.recv().unwrap(); + } + + #[test] + fn stress_shared() { + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + let (tx, rx) = sync_channel::<i32>(0); + let (dtx, drx) = sync_channel::<()>(0); + + thread::spawn(move || { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + dtx.send(()).unwrap(); + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + } + drop(tx); + drx.recv().unwrap(); + } + + #[test] + fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = sync_channel::<i32>(0); + drop(rx); + } + + #[test] + fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = sync_channel::<i32>(0); + drop(tx); + } + + #[test] + fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = sync_channel::<Box<i32>>(0); + drop(rx); + assert!(tx.send(box 0).is_err()); + } + + #[test] + fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will panic + let res = thread::spawn(move || { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + rx.recv().unwrap(); + }) + .join(); + // What is our res? + assert!(res.is_err()); + } + + #[test] + fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = sync_channel::<Box<i32>>(1); + tx.send(box 10).unwrap(); + assert!(*rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_open() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(10), Ok(())); + assert!(rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); + } + + #[test] + fn oneshot_single_thread_try_send_closed2() { + let (tx, _rx) = sync_channel::<i32>(0); + assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); + } + + #[test] + fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); + } + + #[test] + fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn oneshot_single_thread_try_recv_closed_with_data() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(10).unwrap(); + drop(tx); + assert_eq!(rx.try_recv(), Ok(10)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_data() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); + } + + #[test] + fn oneshot_single_thread_peek_close() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_open() { + let (_tx, rx) = sync_channel::<i32>(0); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[test] + fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let _t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(box 10).unwrap(); + } + + #[test] + fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let _t = thread::spawn(move || { + drop(tx); + }); + let res = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }) + .join(); + assert!(res.is_err()); + } + + #[test] + fn oneshot_multi_thread_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + drop(rx); + }); + drop(tx); + } + } + + #[test] + fn oneshot_multi_thread_send_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + drop(rx); + }); + let _ = thread::spawn(move || { + tx.send(1).unwrap(); + }) + .join(); + } + } + + #[test] + fn oneshot_multi_thread_recv_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + let res = thread::spawn(move || { + rx.recv().unwrap(); + }) + .join(); + assert!(res.is_err()); + }); + let _t = thread::spawn(move || { + thread::spawn(move || { + drop(tx); + }); + }); + } + } + + #[test] + fn oneshot_multi_thread_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let _t = thread::spawn(move || { + tx.send(box 10).unwrap(); + }); + assert!(*rx.recv().unwrap() == 10); + } + } + + #[test] + fn stream_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + + send(tx, 0); + recv(rx, 0); + + fn send(tx: SyncSender<Box<i32>>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + tx.send(box i).unwrap(); + send(tx, i + 1); + }); + } + + fn recv(rx: Receiver<Box<i32>>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + }); + } + } + } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = sync_channel(10000); + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } + } + + #[test] + fn shared_chan_stress() { + let (tx, rx) = sync_channel(0); + let total = stress_factor() + 100; + for _ in 0..total { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(()).unwrap(); + }); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + } + + #[test] + fn test_nested_recv_iter() { + let (tx, rx) = sync_channel::<i32>(0); + let (total_tx, total_rx) = sync_channel::<i32>(0); + + let _t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); + } + + #[test] + fn test_recv_iter_break() { + let (tx, rx) = sync_channel::<i32>(0); + let (count_tx, count_rx) = sync_channel(0); + + let _t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.try_send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); + } + + #[test] + fn try_recv_states() { + let (tx1, rx1) = sync_channel::<i32>(1); + let (tx2, rx2) = sync_channel::<()>(1); + let (tx3, rx3) = sync_channel::<()>(1); + let _t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); + } + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + #[test] + fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = sync_channel::<()>(0); + let (tx2, rx2) = sync_channel::<()>(0); + let _t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let t = tx.clone(); + drop(tx); + t.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); + } + + #[test] + fn send1() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + assert_eq!(tx.send(1), Ok(())); + } + + #[test] + fn send2() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + drop(rx); + }); + assert!(tx.send(1).is_err()); + } + + #[test] + fn send3() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.send(1), Ok(())); + let _t = thread::spawn(move || { + drop(rx); + }); + assert!(tx.send(1).is_err()); + } + + #[test] + fn send4() { + let (tx, rx) = sync_channel::<i32>(0); + let tx2 = tx.clone(); + let (done, donerx) = channel(); + let done2 = done.clone(); + let _t = thread::spawn(move || { + assert!(tx.send(1).is_err()); + done.send(()).unwrap(); + }); + let _t = thread::spawn(move || { + assert!(tx2.send(2).is_err()); + done2.send(()).unwrap(); + }); + drop(rx); + donerx.recv().unwrap(); + donerx.recv().unwrap(); + } + + #[test] + fn try_send1() { + let (tx, _rx) = sync_channel::<i32>(0); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); + } + + #[test] + fn try_send2() { + let (tx, _rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(1), Ok(())); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); + } + + #[test] + fn try_send3() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(1), Ok(())); + drop(rx); + assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); + } + + #[test] + fn issue_15761() { + fn repro() { + let (tx1, rx1) = sync_channel::<()>(3); + let (tx2, rx2) = sync_channel::<()>(3); + + let _t = thread::spawn(move || { + rx1.recv().unwrap(); + tx2.try_send(()).unwrap(); + }); + + tx1.try_send(()).unwrap(); + rx2.recv().unwrap(); + } + + for _ in 0..100 { + repro() + } + } +} diff --git a/library/std/src/sync/mpsc/mpsc_queue.rs b/library/std/src/sync/mpsc/mpsc_queue.rs new file mode 100644 index 00000000000..6e7a7be4430 --- /dev/null +++ b/library/std/src/sync/mpsc/mpsc_queue.rs @@ -0,0 +1,165 @@ +//! A mostly lock-free multi-producer, single consumer queue. +//! +//! This module contains an implementation of a concurrent MPSC queue. This +//! queue can be used to share data between threads, and is also used as the +//! building block of channels in rust. +//! +//! Note that the current implementation of this queue has a caveat of the `pop` +//! method, and see the method for more information about it. Due to this +//! caveat, this queue may not be appropriate for all use-cases. + +// http://www.1024cores.net/home/lock-free-algorithms +// /queues/non-intrusive-mpsc-node-based-queue + +pub use self::PopResult::*; + +use core::cell::UnsafeCell; +use core::ptr; + +use crate::boxed::Box; +use crate::sync::atomic::{AtomicPtr, Ordering}; + +/// A result of the `pop` function. +pub enum PopResult<T> { + /// Some data has been popped + Data(T), + /// The queue is empty + Empty, + /// The queue is in an inconsistent state. Popping data should succeed, but + /// some pushers have yet to make enough progress in order allow a pop to + /// succeed. It is recommended that a pop() occur "in the near future" in + /// order to see if the sender has made progress or not + Inconsistent, +} + +struct Node<T> { + next: AtomicPtr<Node<T>>, + value: Option<T>, +} + +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +pub struct Queue<T> { + head: AtomicPtr<Node<T>>, + tail: UnsafeCell<*mut Node<T>>, +} + +unsafe impl<T: Send> Send for Queue<T> {} +unsafe impl<T: Send> Sync for Queue<T> {} + +impl<T> Node<T> { + unsafe fn new(v: Option<T>) -> *mut Node<T> { + Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v }) + } +} + +impl<T> Queue<T> { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub fn new() -> Queue<T> { + let stub = unsafe { Node::new(None) }; + Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } + } + + /// Pushes a new value onto this queue. + pub fn push(&self, t: T) { + unsafe { + let n = Node::new(Some(t)); + let prev = self.head.swap(n, Ordering::AcqRel); + (*prev).next.store(n, Ordering::Release); + } + } + + /// Pops some data from this queue. + /// + /// Note that the current implementation means that this function cannot + /// return `Option<T>`. It is possible for this queue to be in an + /// inconsistent state where many pushes have succeeded and completely + /// finished, but pops cannot return `Some(t)`. This inconsistent state + /// happens when a pusher is pre-empted at an inopportune moment. + /// + /// This inconsistent state means that this queue does indeed have data, but + /// it does not currently have access to it at this time. + pub fn pop(&self) -> PopResult<T> { + unsafe { + let tail = *self.tail.get(); + let next = (*tail).next.load(Ordering::Acquire); + + if !next.is_null() { + *self.tail.get() = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take().unwrap(); + let _: Box<Node<T>> = Box::from_raw(tail); + return Data(ret); + } + + if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent } + } + } +} + +impl<T> Drop for Queue<T> { + fn drop(&mut self) { + unsafe { + let mut cur = *self.tail.get(); + while !cur.is_null() { + let next = (*cur).next.load(Ordering::Relaxed); + let _: Box<Node<T>> = Box::from_raw(cur); + cur = next; + } + } + } +} + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests { + use super::{Data, Empty, Inconsistent, Queue}; + use crate::sync::mpsc::channel; + use crate::sync::Arc; + use crate::thread; + + #[test] + fn test_full() { + let q: Queue<Box<_>> = Queue::new(); + q.push(box 1); + q.push(box 2); + } + + #[test] + fn test() { + let nthreads = 8; + let nmsgs = 1000; + let q = Queue::new(); + match q.pop() { + Empty => {} + Inconsistent | Data(..) => panic!(), + } + let (tx, rx) = channel(); + let q = Arc::new(q); + + for _ in 0..nthreads { + let tx = tx.clone(); + let q = q.clone(); + thread::spawn(move || { + for i in 0..nmsgs { + q.push(i); + } + tx.send(()).unwrap(); + }); + } + + let mut i = 0; + while i < nthreads * nmsgs { + match q.pop() { + Empty | Inconsistent => {} + Data(_) => i += 1, + } + } + drop(tx); + for _ in 0..nthreads { + rx.recv().unwrap(); + } + } +} diff --git a/library/std/src/sync/mpsc/oneshot.rs b/library/std/src/sync/mpsc/oneshot.rs new file mode 100644 index 00000000000..75f5621fa12 --- /dev/null +++ b/library/std/src/sync/mpsc/oneshot.rs @@ -0,0 +1,307 @@ +/// Oneshot channels/ports +/// +/// This is the initial flavor of channels/ports used for comm module. This is +/// an optimization for the one-use case of a channel. The major optimization of +/// this type is to have one and exactly one allocation when the chan/port pair +/// is created. +/// +/// Another possible optimization would be to not use an Arc box because +/// in theory we know when the shared packet can be deallocated (no real need +/// for the atomic reference counting), but I was having trouble how to destroy +/// the data early in a drop of a Port. +/// +/// # Implementation +/// +/// Oneshots are implemented around one atomic usize variable. This variable +/// indicates both the state of the port/chan but also contains any threads +/// blocked on the port. All atomic operations happen on this one word. +/// +/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect +/// on behalf of the channel side of things (it can be mentally thought of as +/// consuming the port). This upgrade is then also stored in the shared packet. +/// The one caveat to consider is that when a port sees a disconnected channel +/// it must check for data because there is no "data plus upgrade" state. +pub use self::Failure::*; +use self::MyUpgrade::*; +pub use self::UpgradeResult::*; + +use crate::cell::UnsafeCell; +use crate::ptr; +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::mpsc::blocking::{self, SignalToken}; +use crate::sync::mpsc::Receiver; +use crate::time::Instant; + +// Various states you can find a port in. +const EMPTY: usize = 0; // initial state: no data, no blocked receiver +const DATA: usize = 1; // data ready for receiver to take +const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded +// Any other value represents a pointer to a SignalToken value. The +// protocol ensures that when the state moves *to* a pointer, +// ownership of the token is given to the packet, and when the state +// moves *from* a pointer, ownership of the token is transferred to +// whoever changed the state. + +pub struct Packet<T> { + // Internal state of the chan/port pair (stores the blocked thread as well) + state: AtomicUsize, + // One-shot data slot location + data: UnsafeCell<Option<T>>, + // when used for the second time, a oneshot channel must be upgraded, and + // this contains the slot for the upgrade + upgrade: UnsafeCell<MyUpgrade<T>>, +} + +pub enum Failure<T> { + Empty, + Disconnected, + Upgraded(Receiver<T>), +} + +pub enum UpgradeResult { + UpSuccess, + UpDisconnected, + UpWoke(SignalToken), +} + +enum MyUpgrade<T> { + NothingSent, + SendUsed, + GoUp(Receiver<T>), +} + +impl<T> Packet<T> { + pub fn new() -> Packet<T> { + Packet { + data: UnsafeCell::new(None), + upgrade: UnsafeCell::new(NothingSent), + state: AtomicUsize::new(EMPTY), + } + } + + pub fn send(&self, t: T) -> Result<(), T> { + unsafe { + // Sanity check + match *self.upgrade.get() { + NothingSent => {} + _ => panic!("sending on a oneshot that's already sent on "), + } + assert!((*self.data.get()).is_none()); + ptr::write(self.data.get(), Some(t)); + ptr::write(self.upgrade.get(), SendUsed); + + match self.state.swap(DATA, Ordering::SeqCst) { + // Sent the data, no one was waiting + EMPTY => Ok(()), + + // Couldn't send the data, the port hung up first. Return the data + // back up the stack. + DISCONNECTED => { + self.state.swap(DISCONNECTED, Ordering::SeqCst); + ptr::write(self.upgrade.get(), NothingSent); + Err((&mut *self.data.get()).take().unwrap()) + } + + // Not possible, these are one-use channels + DATA => unreachable!(), + + // There is a thread waiting on the other end. We leave the 'DATA' + // state inside so it'll pick it up on the other end. + ptr => { + SignalToken::cast_from_usize(ptr).signal(); + Ok(()) + } + } + } + } + + // Just tests whether this channel has been sent on or not, this is only + // safe to use from the sender. + pub fn sent(&self) -> bool { + unsafe { !matches!(*self.upgrade.get(), NothingSent) } + } + + pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> { + // Attempt to not block the thread (it's a little expensive). If it looks + // like we're not empty, then immediately go through to `try_recv`. + if self.state.load(Ordering::SeqCst) == EMPTY { + let (wait_token, signal_token) = blocking::tokens(); + let ptr = unsafe { signal_token.cast_to_usize() }; + + // race with senders to enter the blocking state + if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY { + if let Some(deadline) = deadline { + let timed_out = !wait_token.wait_max_until(deadline); + // Try to reset the state + if timed_out { + self.abort_selection().map_err(Upgraded)?; + } + } else { + wait_token.wait(); + debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY); + } + } else { + // drop the signal token, since we never blocked + drop(unsafe { SignalToken::cast_from_usize(ptr) }); + } + } + + self.try_recv() + } + + pub fn try_recv(&self) -> Result<T, Failure<T>> { + unsafe { + match self.state.load(Ordering::SeqCst) { + EMPTY => Err(Empty), + + // We saw some data on the channel, but the channel can be used + // again to send us an upgrade. As a result, we need to re-insert + // into the channel that there's no data available (otherwise we'll + // just see DATA next time). This is done as a cmpxchg because if + // the state changes under our feet we'd rather just see that state + // change. + DATA => { + self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst); + match (&mut *self.data.get()).take() { + Some(data) => Ok(data), + None => unreachable!(), + } + } + + // There's no guarantee that we receive before an upgrade happens, + // and an upgrade flags the channel as disconnected, so when we see + // this we first need to check if there's data available and *then* + // we go through and process the upgrade. + DISCONNECTED => match (&mut *self.data.get()).take() { + Some(data) => Ok(data), + None => match ptr::replace(self.upgrade.get(), SendUsed) { + SendUsed | NothingSent => Err(Disconnected), + GoUp(upgrade) => Err(Upgraded(upgrade)), + }, + }, + + // We are the sole receiver; there cannot be a blocking + // receiver already. + _ => unreachable!(), + } + } + } + + // Returns whether the upgrade was completed. If the upgrade wasn't + // completed, then the port couldn't get sent to the other half (it will + // never receive it). + pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult { + unsafe { + let prev = match *self.upgrade.get() { + NothingSent => NothingSent, + SendUsed => SendUsed, + _ => panic!("upgrading again"), + }; + ptr::write(self.upgrade.get(), GoUp(up)); + + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + // If the channel is empty or has data on it, then we're good to go. + // Senders will check the data before the upgrade (in case we + // plastered over the DATA state). + DATA | EMPTY => UpSuccess, + + // If the other end is already disconnected, then we failed the + // upgrade. Be sure to trash the port we were given. + DISCONNECTED => { + ptr::replace(self.upgrade.get(), prev); + UpDisconnected + } + + // If someone's waiting, we gotta wake them up + ptr => UpWoke(SignalToken::cast_from_usize(ptr)), + } + } + } + + pub fn drop_chan(&self) { + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + DATA | DISCONNECTED | EMPTY => {} + + // If someone's waiting, we gotta wake them up + ptr => unsafe { + SignalToken::cast_from_usize(ptr).signal(); + }, + } + } + + pub fn drop_port(&self) { + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + // An empty channel has nothing to do, and a remotely disconnected + // channel also has nothing to do b/c we're about to run the drop + // glue + DISCONNECTED | EMPTY => {} + + // There's data on the channel, so make sure we destroy it promptly. + // This is why not using an arc is a little difficult (need the box + // to stay valid while we take the data). + DATA => unsafe { + (&mut *self.data.get()).take().unwrap(); + }, + + // We're the only ones that can block on this port + _ => unreachable!(), + } + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // Remove a previous selecting thread from this port. This ensures that the + // blocked thread will no longer be visible to any other threads. + // + // The return value indicates whether there's data on this port. + pub fn abort_selection(&self) -> Result<bool, Receiver<T>> { + let state = match self.state.load(Ordering::SeqCst) { + // Each of these states means that no further activity will happen + // with regard to abortion selection + s @ (EMPTY | DATA | DISCONNECTED) => s, + + // If we've got a blocked thread, then use an atomic to gain ownership + // of it (may fail) + ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst), + }; + + // Now that we've got ownership of our state, figure out what to do + // about it. + match state { + EMPTY => unreachable!(), + // our thread used for select was stolen + DATA => Ok(true), + + // If the other end has hung up, then we have complete ownership + // of the port. First, check if there was data waiting for us. This + // is possible if the other end sent something and then hung up. + // + // We then need to check to see if there was an upgrade requested, + // and if so, the upgraded port needs to have its selection aborted. + DISCONNECTED => unsafe { + if (*self.data.get()).is_some() { + Ok(true) + } else { + match ptr::replace(self.upgrade.get(), SendUsed) { + GoUp(port) => Err(port), + _ => Ok(true), + } + } + }, + + // We woke ourselves up from select. + ptr => unsafe { + drop(SignalToken::cast_from_usize(ptr)); + Ok(false) + }, + } + } +} + +impl<T> Drop for Packet<T> { + fn drop(&mut self) { + assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED); + } +} diff --git a/library/std/src/sync/mpsc/shared.rs b/library/std/src/sync/mpsc/shared.rs new file mode 100644 index 00000000000..898654f21f2 --- /dev/null +++ b/library/std/src/sync/mpsc/shared.rs @@ -0,0 +1,489 @@ +/// Shared channels. +/// +/// This is the flavor of channels which are not necessarily optimized for any +/// particular use case, but are the most general in how they are used. Shared +/// channels are cloneable allowing for multiple senders. +/// +/// High level implementation details can be found in the comment of the parent +/// module. You'll also note that the implementation of the shared and stream +/// channels are quite similar, and this is no coincidence! +pub use self::Failure::*; +use self::StartResult::*; + +use core::cmp; +use core::intrinsics::abort; + +use crate::cell::UnsafeCell; +use crate::ptr; +use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; +use crate::sync::mpsc::blocking::{self, SignalToken}; +use crate::sync::mpsc::mpsc_queue as mpsc; +use crate::sync::{Mutex, MutexGuard}; +use crate::thread; +use crate::time::Instant; + +const DISCONNECTED: isize = isize::MIN; +const FUDGE: isize = 1024; +const MAX_REFCOUNT: usize = (isize::MAX) as usize; +#[cfg(test)] +const MAX_STEALS: isize = 5; +#[cfg(not(test))] +const MAX_STEALS: isize = 1 << 20; + +pub struct Packet<T> { + queue: mpsc::Queue<T>, + cnt: AtomicIsize, // How many items are on this channel + steals: UnsafeCell<isize>, // How many times has a port received without blocking? + to_wake: AtomicUsize, // SignalToken for wake up + + // The number of channels which are currently using this packet. + channels: AtomicUsize, + + // See the discussion in Port::drop and the channel send methods for what + // these are used for + port_dropped: AtomicBool, + sender_drain: AtomicIsize, + + // this lock protects various portions of this implementation during + // select() + select_lock: Mutex<()>, +} + +pub enum Failure { + Empty, + Disconnected, +} + +#[derive(PartialEq, Eq)] +enum StartResult { + Installed, + Abort, +} + +impl<T> Packet<T> { + // Creation of a packet *must* be followed by a call to postinit_lock + // and later by inherit_blocker + pub fn new() -> Packet<T> { + Packet { + queue: mpsc::Queue::new(), + cnt: AtomicIsize::new(0), + steals: UnsafeCell::new(0), + to_wake: AtomicUsize::new(0), + channels: AtomicUsize::new(2), + port_dropped: AtomicBool::new(false), + sender_drain: AtomicIsize::new(0), + select_lock: Mutex::new(()), + } + } + + // This function should be used after newly created Packet + // was wrapped with an Arc + // In other case mutex data will be duplicated while cloning + // and that could cause problems on platforms where it is + // represented by opaque data structure + pub fn postinit_lock(&self) -> MutexGuard<'_, ()> { + self.select_lock.lock().unwrap() + } + + // This function is used at the creation of a shared packet to inherit a + // previously blocked thread. This is done to prevent spurious wakeups of + // threads in select(). + // + // This can only be called at channel-creation time + pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) { + if let Some(token) = token { + assert_eq!(self.cnt.load(Ordering::SeqCst), 0); + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst); + self.cnt.store(-1, Ordering::SeqCst); + + // This store is a little sketchy. What's happening here is that + // we're transferring a blocker from a oneshot or stream channel to + // this shared channel. In doing so, we never spuriously wake them + // up and rather only wake them up at the appropriate time. This + // implementation of shared channels assumes that any blocking + // recv() will undo the increment of steals performed in try_recv() + // once the recv is complete. This thread that we're inheriting, + // however, is not in the middle of recv. Hence, the first time we + // wake them up, they're going to wake up from their old port, move + // on to the upgraded port, and then call the block recv() function. + // + // When calling this function, they'll find there's data immediately + // available, counting it as a steal. This in fact wasn't a steal + // because we appropriately blocked them waiting for data. + // + // To offset this bad increment, we initially set the steal count to + // -1. You'll find some special code in abort_selection() as well to + // ensure that this -1 steal count doesn't escape too far. + unsafe { + *self.steals.get() = -1; + } + } + + // When the shared packet is constructed, we grabbed this lock. The + // purpose of this lock is to ensure that abort_selection() doesn't + // interfere with this method. After we unlock this lock, we're + // signifying that we're done modifying self.cnt and self.to_wake and + // the port is ready for the world to continue using it. + drop(guard); + } + + pub fn send(&self, t: T) -> Result<(), T> { + // See Port::drop for what's going on + if self.port_dropped.load(Ordering::SeqCst) { + return Err(t); + } + + // Note that the multiple sender case is a little trickier + // semantically than the single sender case. The logic for + // incrementing is "add and if disconnected store disconnected". + // This could end up leading some senders to believe that there + // wasn't a disconnect if in fact there was a disconnect. This means + // that while one thread is attempting to re-store the disconnected + // states, other threads could walk through merrily incrementing + // this very-negative disconnected count. To prevent senders from + // spuriously attempting to send when the channels is actually + // disconnected, the count has a ranged check here. + // + // This is also done for another reason. Remember that the return + // value of this function is: + // + // `true` == the data *may* be received, this essentially has no + // meaning + // `false` == the data will *never* be received, this has a lot of + // meaning + // + // In the SPSC case, we have a check of 'queue.is_empty()' to see + // whether the data was actually received, but this same condition + // means nothing in a multi-producer context. As a result, this + // preflight check serves as the definitive "this will never be + // received". Once we get beyond this check, we have permanently + // entered the realm of "this may be received" + if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE { + return Err(t); + } + + self.queue.push(t); + match self.cnt.fetch_add(1, Ordering::SeqCst) { + -1 => { + self.take_to_wake().signal(); + } + + // In this case, we have possibly failed to send our data, and + // we need to consider re-popping the data in order to fully + // destroy it. We must arbitrate among the multiple senders, + // however, because the queues that we're using are + // single-consumer queues. In order to do this, all exiting + // pushers will use an atomic count in order to count those + // flowing through. Pushers who see 0 are required to drain as + // much as possible, and then can only exit when they are the + // only pusher (otherwise they must try again). + n if n < DISCONNECTED + FUDGE => { + // see the comment in 'try' for a shared channel for why this + // window of "not disconnected" is ok. + self.cnt.store(DISCONNECTED, Ordering::SeqCst); + + if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 { + loop { + // drain the queue, for info on the thread yield see the + // discussion in try_recv + loop { + match self.queue.pop() { + mpsc::Data(..) => {} + mpsc::Empty => break, + mpsc::Inconsistent => thread::yield_now(), + } + } + // maybe we're done, if we're not the last ones + // here, then we need to go try again. + if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 { + break; + } + } + + // At this point, there may still be data on the queue, + // but only if the count hasn't been incremented and + // some other sender hasn't finished pushing data just + // yet. That sender in question will drain its own data. + } + } + + // Can't make any assumptions about this case like in the SPSC case. + _ => {} + } + + Ok(()) + } + + pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> { + // This code is essentially the exact same as that found in the stream + // case (see stream.rs) + match self.try_recv() { + Err(Empty) => {} + data => return data, + } + + let (wait_token, signal_token) = blocking::tokens(); + if self.decrement(signal_token) == Installed { + if let Some(deadline) = deadline { + let timed_out = !wait_token.wait_max_until(deadline); + if timed_out { + self.abort_selection(false); + } + } else { + wait_token.wait(); + } + } + + match self.try_recv() { + data @ Ok(..) => unsafe { + *self.steals.get() -= 1; + data + }, + data => data, + } + } + + // Essentially the exact same thing as the stream decrement function. + // Returns true if blocking should proceed. + fn decrement(&self, token: SignalToken) -> StartResult { + unsafe { + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + let ptr = token.cast_to_usize(); + self.to_wake.store(ptr, Ordering::SeqCst); + + let steals = ptr::replace(self.steals.get(), 0); + + match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, Ordering::SeqCst); + } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { + return Installed; + } + } + } + + self.to_wake.store(0, Ordering::SeqCst); + drop(SignalToken::cast_from_usize(ptr)); + Abort + } + } + + pub fn try_recv(&self) -> Result<T, Failure> { + let ret = match self.queue.pop() { + mpsc::Data(t) => Some(t), + mpsc::Empty => None, + + // This is a bit of an interesting case. The channel is reported as + // having data available, but our pop() has failed due to the queue + // being in an inconsistent state. This means that there is some + // pusher somewhere which has yet to complete, but we are guaranteed + // that a pop will eventually succeed. In this case, we spin in a + // yield loop because the remote sender should finish their enqueue + // operation "very quickly". + // + // Avoiding this yield loop would require a different queue + // abstraction which provides the guarantee that after M pushes have + // succeeded, at least M pops will succeed. The current queues + // guarantee that if there are N active pushes, you can pop N times + // once all N have finished. + mpsc::Inconsistent => { + let data; + loop { + thread::yield_now(); + match self.queue.pop() { + mpsc::Data(t) => { + data = t; + break; + } + mpsc::Empty => panic!("inconsistent => empty"), + mpsc::Inconsistent => {} + } + } + Some(data) + } + }; + match ret { + // See the discussion in the stream implementation for why we + // might decrement steals. + Some(data) => unsafe { + if *self.steals.get() > MAX_STEALS { + match self.cnt.swap(0, Ordering::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, Ordering::SeqCst); + } + n => { + let m = cmp::min(n, *self.steals.get()); + *self.steals.get() -= m; + self.bump(n - m); + } + } + assert!(*self.steals.get() >= 0); + } + *self.steals.get() += 1; + Ok(data) + }, + + // See the discussion in the stream implementation for why we try + // again. + None => { + match self.cnt.load(Ordering::SeqCst) { + n if n != DISCONNECTED => Err(Empty), + _ => { + match self.queue.pop() { + mpsc::Data(t) => Ok(t), + mpsc::Empty => Err(Disconnected), + // with no senders, an inconsistency is impossible. + mpsc::Inconsistent => unreachable!(), + } + } + } + } + } + } + + // Prepares this shared packet for a channel clone, essentially just bumping + // a refcount. + pub fn clone_chan(&self) { + let old_count = self.channels.fetch_add(1, Ordering::SeqCst); + + // See comments on Arc::clone() on why we do this (for `mem::forget`). + if old_count > MAX_REFCOUNT { + abort(); + } + } + + // Decrement the reference count on a channel. This is called whenever a + // Chan is dropped and may end up waking up a receiver. It's the receiver's + // responsibility on the other end to figure out that we've disconnected. + pub fn drop_chan(&self) { + match self.channels.fetch_sub(1, Ordering::SeqCst) { + 1 => {} + n if n > 1 => return, + n => panic!("bad number of channels left {}", n), + } + + match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { + -1 => { + self.take_to_wake().signal(); + } + DISCONNECTED => {} + n => { + assert!(n >= 0); + } + } + } + + // See the long discussion inside of stream.rs for why the queue is drained, + // and why it is done in this fashion. + pub fn drop_port(&self) { + self.port_dropped.store(true, Ordering::SeqCst); + let mut steals = unsafe { *self.steals.get() }; + while { + let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, Ordering::SeqCst); + cnt != DISCONNECTED && cnt != steals + } { + // See the discussion in 'try_recv' for why we yield + // control of this thread. + loop { + match self.queue.pop() { + mpsc::Data(..) => { + steals += 1; + } + mpsc::Empty | mpsc::Inconsistent => break, + } + } + } + } + + // Consumes ownership of the 'to_wake' field. + fn take_to_wake(&self) -> SignalToken { + let ptr = self.to_wake.load(Ordering::SeqCst); + self.to_wake.store(0, Ordering::SeqCst); + assert!(ptr != 0); + unsafe { SignalToken::cast_from_usize(ptr) } + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // increment the count on the channel (used for selection) + fn bump(&self, amt: isize) -> isize { + match self.cnt.fetch_add(amt, Ordering::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, Ordering::SeqCst); + DISCONNECTED + } + n => n, + } + } + + // Cancels a previous thread waiting on this port, returning whether there's + // data on the port. + // + // This is similar to the stream implementation (hence fewer comments), but + // uses a different value for the "steals" variable. + pub fn abort_selection(&self, _was_upgrade: bool) -> bool { + // Before we do anything else, we bounce on this lock. The reason for + // doing this is to ensure that any upgrade-in-progress is gone and + // done with. Without this bounce, we can race with inherit_blocker + // about looking at and dealing with to_wake. Once we have acquired the + // lock, we are guaranteed that inherit_blocker is done. + { + let _guard = self.select_lock.lock().unwrap(); + } + + // Like the stream implementation, we want to make sure that the count + // on the channel goes non-negative. We don't know how negative the + // stream currently is, so instead of using a steal value of 1, we load + // the channel count and figure out what we should do to make it + // positive. + let steals = { + let cnt = self.cnt.load(Ordering::SeqCst); + if cnt < 0 && cnt != DISCONNECTED { -cnt } else { 0 } + }; + let prev = self.bump(steals + 1); + + if prev == DISCONNECTED { + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + true + } else { + let cur = prev + steals + 1; + assert!(cur >= 0); + if prev < 0 { + drop(self.take_to_wake()); + } else { + while self.to_wake.load(Ordering::SeqCst) != 0 { + thread::yield_now(); + } + } + unsafe { + // if the number of steals is -1, it was the pre-emptive -1 steal + // count from when we inherited a blocker. This is fine because + // we're just going to overwrite it with a real value. + let old = self.steals.get(); + assert!(*old == 0 || *old == -1); + *old = steals; + prev >= 0 + } + } + } +} + +impl<T> Drop for Packet<T> { + fn drop(&mut self) { + // Note that this load is not only an assert for correctness about + // disconnection, but also a proper fence before the read of + // `to_wake`, so this assert cannot be removed with also removing + // the `to_wake` assert. + assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED); + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.channels.load(Ordering::SeqCst), 0); + } +} diff --git a/library/std/src/sync/mpsc/spsc_queue.rs b/library/std/src/sync/mpsc/spsc_queue.rs new file mode 100644 index 00000000000..0274268f69f --- /dev/null +++ b/library/std/src/sync/mpsc/spsc_queue.rs @@ -0,0 +1,338 @@ +//! A single-producer single-consumer concurrent queue +//! +//! This module contains the implementation of an SPSC queue which can be used +//! concurrently between two threads. This data structure is safe to use and +//! enforces the semantics that there is one pusher and one popper. + +// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue + +use core::cell::UnsafeCell; +use core::ptr; + +use crate::boxed::Box; +use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; + +use super::cache_aligned::CacheAligned; + +// Node within the linked list queue of messages to send +struct Node<T> { + // FIXME: this could be an uninitialized T if we're careful enough, and + // that would reduce memory usage (and be a bit faster). + // is it worth it? + value: Option<T>, // nullable for re-use of nodes + cached: bool, // This node goes into the node cache + next: AtomicPtr<Node<T>>, // next node in the queue +} + +/// The single-producer single-consumer queue. This structure is not cloneable, +/// but it can be safely shared in an Arc if it is guaranteed that there +/// is only one popper and one pusher touching the queue at any one point in +/// time. +pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> { + // consumer fields + consumer: CacheAligned<Consumer<T, ConsumerAddition>>, + + // producer fields + producer: CacheAligned<Producer<T, ProducerAddition>>, +} + +struct Consumer<T, Addition> { + tail: UnsafeCell<*mut Node<T>>, // where to pop from + tail_prev: AtomicPtr<Node<T>>, // where to pop from + cache_bound: usize, // maximum cache size + cached_nodes: AtomicUsize, // number of nodes marked as cacheable + addition: Addition, +} + +struct Producer<T, Addition> { + head: UnsafeCell<*mut Node<T>>, // where to push to + first: UnsafeCell<*mut Node<T>>, // where to get new nodes from + tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail + addition: Addition, +} + +unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {} + +unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {} + +impl<T> Node<T> { + fn new() -> *mut Node<T> { + Box::into_raw(box Node { + value: None, + cached: false, + next: AtomicPtr::new(ptr::null_mut::<Node<T>>()), + }) + } +} + +impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> { + /// Creates a new queue. With given additional elements in the producer and + /// consumer portions of the queue. + /// + /// Due to the performance implications of cache-contention, + /// we wish to keep fields used mainly by the producer on a separate cache + /// line than those used by the consumer. + /// Since cache lines are usually 64 bytes, it is unreasonably expensive to + /// allocate one for small fields, so we allow users to insert additional + /// fields into the cache lines already allocated by this for the producer + /// and consumer. + /// + /// This is unsafe as the type system doesn't enforce a single + /// consumer-producer relationship. It also allows the consumer to `pop` + /// items while there is a `peek` active due to all methods having a + /// non-mutable receiver. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub unsafe fn with_additions( + bound: usize, + producer_addition: ProducerAddition, + consumer_addition: ConsumerAddition, + ) -> Self { + let n1 = Node::new(); + let n2 = Node::new(); + (*n1).next.store(n2, Ordering::Relaxed); + Queue { + consumer: CacheAligned::new(Consumer { + tail: UnsafeCell::new(n2), + tail_prev: AtomicPtr::new(n1), + cache_bound: bound, + cached_nodes: AtomicUsize::new(0), + addition: consumer_addition, + }), + producer: CacheAligned::new(Producer { + head: UnsafeCell::new(n2), + first: UnsafeCell::new(n1), + tail_copy: UnsafeCell::new(n1), + addition: producer_addition, + }), + } + } + + /// Pushes a new value onto this queue. Note that to use this function + /// safely, it must be externally guaranteed that there is only one pusher. + pub fn push(&self, t: T) { + unsafe { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(ptr::null_mut(), Ordering::Relaxed); + (**self.producer.head.get()).next.store(n, Ordering::Release); + *(&self.producer.head).get() = n; + } + } + + unsafe fn alloc(&self) -> *mut Node<T> { + // First try to see if we can consume the 'first' node for our uses. + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); + return ret; + } + // If the above fails, then update our copy of the tail and try + // again. + *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire); + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); + return ret; + } + // If all of that fails, then we have to allocate a new node + // (there's nothing in the node cache). + Node::new() + } + + /// Attempts to pop a value from this queue. Remember that to use this type + /// safely you must ensure that there is only one popper at a time. + pub fn pop(&self) -> Option<T> { + unsafe { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = *self.consumer.tail.get(); + let next = (*tail).next.load(Ordering::Acquire); + if next.is_null() { + return None; + } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); + + *self.consumer.0.tail.get() = next; + if self.consumer.cache_bound == 0 { + self.consumer.tail_prev.store(tail, Ordering::Release); + } else { + let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed); + if cached_nodes < self.consumer.cache_bound && !(*tail).cached { + self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed); + (*tail).cached = true; + } + + if (*tail).cached { + self.consumer.tail_prev.store(tail, Ordering::Release); + } else { + (*self.consumer.tail_prev.load(Ordering::Relaxed)) + .next + .store(next, Ordering::Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: Box<Node<T>> = Box::from_raw(tail); + } + } + ret + } + } + + /// Attempts to peek at the head of the queue, returning `None` if the queue + /// has no data currently + /// + /// # Warning + /// The reference returned is invalid if it is not used before the consumer + /// pops the value off the queue. If the producer then pushes another value + /// onto the queue, it will overwrite the value pointed to by the reference. + pub fn peek(&self) -> Option<&mut T> { + // This is essentially the same as above with all the popping bits + // stripped out. + unsafe { + let tail = *self.consumer.tail.get(); + let next = (*tail).next.load(Ordering::Acquire); + if next.is_null() { None } else { (*next).value.as_mut() } + } + } + + pub fn producer_addition(&self) -> &ProducerAddition { + &self.producer.addition + } + + pub fn consumer_addition(&self) -> &ConsumerAddition { + &self.consumer.addition + } +} + +impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> { + fn drop(&mut self) { + unsafe { + let mut cur = *self.producer.first.get(); + while !cur.is_null() { + let next = (*cur).next.load(Ordering::Relaxed); + let _n: Box<Node<T>> = Box::from_raw(cur); + cur = next; + } + } + } +} + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests { + use super::Queue; + use crate::sync::mpsc::channel; + use crate::sync::Arc; + use crate::thread; + + #[test] + fn smoke() { + unsafe { + let queue = Queue::with_additions(0, (), ()); + queue.push(1); + queue.push(2); + assert_eq!(queue.pop(), Some(1)); + assert_eq!(queue.pop(), Some(2)); + assert_eq!(queue.pop(), None); + queue.push(3); + queue.push(4); + assert_eq!(queue.pop(), Some(3)); + assert_eq!(queue.pop(), Some(4)); + assert_eq!(queue.pop(), None); + } + } + + #[test] + fn peek() { + unsafe { + let queue = Queue::with_additions(0, (), ()); + queue.push(vec![1]); + + // Ensure the borrowchecker works + match queue.peek() { + Some(vec) => { + assert_eq!(&*vec, &[1]); + } + None => unreachable!(), + } + + match queue.pop() { + Some(vec) => { + assert_eq!(&*vec, &[1]); + } + None => unreachable!(), + } + } + } + + #[test] + fn drop_full() { + unsafe { + let q: Queue<Box<_>> = Queue::with_additions(0, (), ()); + q.push(box 1); + q.push(box 2); + } + } + + #[test] + fn smoke_bound() { + unsafe { + let q = Queue::with_additions(0, (), ()); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); + } + } + + #[test] + fn stress() { + unsafe { + stress_bound(0); + stress_bound(1); + } + + unsafe fn stress_bound(bound: usize) { + let q = Arc::new(Queue::with_additions(bound, (), ())); + + let (tx, rx) = channel(); + let q2 = q.clone(); + let _t = thread::spawn(move || { + for _ in 0..100000 { + loop { + match q2.pop() { + Some(1) => break, + Some(_) => panic!(), + None => {} + } + } + } + tx.send(()).unwrap(); + }); + for _ in 0..100000 { + q.push(1); + } + rx.recv().unwrap(); + } + } +} diff --git a/library/std/src/sync/mpsc/stream.rs b/library/std/src/sync/mpsc/stream.rs new file mode 100644 index 00000000000..9f7c1af8951 --- /dev/null +++ b/library/std/src/sync/mpsc/stream.rs @@ -0,0 +1,453 @@ +/// Stream channels +/// +/// This is the flavor of channels which are optimized for one sender and one +/// receiver. The sender will be upgraded to a shared channel if the channel is +/// cloned. +/// +/// High level implementation details can be found in the comment of the parent +/// module. +pub use self::Failure::*; +use self::Message::*; +pub use self::UpgradeResult::*; + +use core::cmp; + +use crate::cell::UnsafeCell; +use crate::ptr; +use crate::thread; +use crate::time::Instant; + +use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; +use crate::sync::mpsc::blocking::{self, SignalToken}; +use crate::sync::mpsc::spsc_queue as spsc; +use crate::sync::mpsc::Receiver; + +const DISCONNECTED: isize = isize::MIN; +#[cfg(test)] +const MAX_STEALS: isize = 5; +#[cfg(not(test))] +const MAX_STEALS: isize = 1 << 20; + +pub struct Packet<T> { + // internal queue for all messages + queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>, +} + +struct ProducerAddition { + cnt: AtomicIsize, // How many items are on this channel + to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up + + port_dropped: AtomicBool, // flag if the channel has been destroyed. +} + +struct ConsumerAddition { + steals: UnsafeCell<isize>, // How many times has a port received without blocking? +} + +pub enum Failure<T> { + Empty, + Disconnected, + Upgraded(Receiver<T>), +} + +pub enum UpgradeResult { + UpSuccess, + UpDisconnected, + UpWoke(SignalToken), +} + +// Any message could contain an "upgrade request" to a new shared port, so the +// internal queue it's a queue of T, but rather Message<T> +enum Message<T> { + Data(T), + GoUp(Receiver<T>), +} + +impl<T> Packet<T> { + pub fn new() -> Packet<T> { + Packet { + queue: unsafe { + spsc::Queue::with_additions( + 128, + ProducerAddition { + cnt: AtomicIsize::new(0), + to_wake: AtomicUsize::new(0), + + port_dropped: AtomicBool::new(false), + }, + ConsumerAddition { steals: UnsafeCell::new(0) }, + ) + }, + } + } + + pub fn send(&self, t: T) -> Result<(), T> { + // If the other port has deterministically gone away, then definitely + // must return the data back up the stack. Otherwise, the data is + // considered as being sent. + if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { + return Err(t); + } + + match self.do_send(Data(t)) { + UpSuccess | UpDisconnected => {} + UpWoke(token) => { + token.signal(); + } + } + Ok(()) + } + + pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult { + // If the port has gone away, then there's no need to proceed any + // further. + if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { + return UpDisconnected; + } + + self.do_send(GoUp(up)) + } + + fn do_send(&self, t: Message<T>) -> UpgradeResult { + self.queue.push(t); + match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) { + // As described in the mod's doc comment, -1 == wakeup + -1 => UpWoke(self.take_to_wake()), + // As as described before, SPSC queues must be >= -2 + -2 => UpSuccess, + + // Be sure to preserve the disconnected state, and the return value + // in this case is going to be whether our data was received or not. + // This manifests itself on whether we have an empty queue or not. + // + // Primarily, are required to drain the queue here because the port + // will never remove this data. We can only have at most one item to + // drain (the port drains the rest). + DISCONNECTED => { + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); + let first = self.queue.pop(); + let second = self.queue.pop(); + assert!(second.is_none()); + + match first { + Some(..) => UpSuccess, // we failed to send the data + None => UpDisconnected, // we successfully sent data + } + } + + // Otherwise we just sent some data on a non-waiting queue, so just + // make sure the world is sane and carry on! + n => { + assert!(n >= 0); + UpSuccess + } + } + } + + // Consumes ownership of the 'to_wake' field. + fn take_to_wake(&self) -> SignalToken { + let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst); + assert!(ptr != 0); + unsafe { SignalToken::cast_from_usize(ptr) } + } + + // Decrements the count on the channel for a sleeper, returning the sleeper + // back if it shouldn't sleep. Note that this is the location where we take + // steals into account. + fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> { + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); + let ptr = unsafe { token.cast_to_usize() }; + self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst); + + let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) }; + + match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + DISCONNECTED => { + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); + } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { + return Ok(()); + } + } + } + + self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst); + Err(unsafe { SignalToken::cast_from_usize(ptr) }) + } + + pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> { + // Optimistic preflight check (scheduling is expensive). + match self.try_recv() { + Err(Empty) => {} + data => return data, + } + + // Welp, our channel has no data. Deschedule the current thread and + // initiate the blocking protocol. + let (wait_token, signal_token) = blocking::tokens(); + if self.decrement(signal_token).is_ok() { + if let Some(deadline) = deadline { + let timed_out = !wait_token.wait_max_until(deadline); + if timed_out { + self.abort_selection(/* was_upgrade = */ false).map_err(Upgraded)?; + } + } else { + wait_token.wait(); + } + } + + match self.try_recv() { + // Messages which actually popped from the queue shouldn't count as + // a steal, so offset the decrement here (we already have our + // "steal" factored into the channel count above). + data @ (Ok(..) | Err(Upgraded(..))) => unsafe { + *self.queue.consumer_addition().steals.get() -= 1; + data + }, + + data => data, + } + } + + pub fn try_recv(&self) -> Result<T, Failure<T>> { + match self.queue.pop() { + // If we stole some data, record to that effect (this will be + // factored into cnt later on). + // + // Note that we don't allow steals to grow without bound in order to + // prevent eventual overflow of either steals or cnt as an overflow + // would have catastrophic results. Sometimes, steals > cnt, but + // other times cnt > steals, so we don't know the relation between + // steals and cnt. This code path is executed only rarely, so we do + // a pretty slow operation, of swapping 0 into cnt, taking steals + // down as much as possible (without going negative), and then + // adding back in whatever we couldn't factor into steals. + Some(data) => unsafe { + if *self.queue.consumer_addition().steals.get() > MAX_STEALS { + match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) { + DISCONNECTED => { + self.queue + .producer_addition() + .cnt + .store(DISCONNECTED, Ordering::SeqCst); + } + n => { + let m = cmp::min(n, *self.queue.consumer_addition().steals.get()); + *self.queue.consumer_addition().steals.get() -= m; + self.bump(n - m); + } + } + assert!(*self.queue.consumer_addition().steals.get() >= 0); + } + *self.queue.consumer_addition().steals.get() += 1; + match data { + Data(t) => Ok(t), + GoUp(up) => Err(Upgraded(up)), + } + }, + + None => { + match self.queue.producer_addition().cnt.load(Ordering::SeqCst) { + n if n != DISCONNECTED => Err(Empty), + + // This is a little bit of a tricky case. We failed to pop + // data above, and then we have viewed that the channel is + // disconnected. In this window more data could have been + // sent on the channel. It doesn't really make sense to + // return that the channel is disconnected when there's + // actually data on it, so be extra sure there's no data by + // popping one more time. + // + // We can ignore steals because the other end is + // disconnected and we'll never need to really factor in our + // steals again. + _ => match self.queue.pop() { + Some(Data(t)) => Ok(t), + Some(GoUp(up)) => Err(Upgraded(up)), + None => Err(Disconnected), + }, + } + } + } + } + + pub fn drop_chan(&self) { + // Dropping a channel is pretty simple, we just flag it as disconnected + // and then wakeup a blocker if there is one. + match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) { + -1 => { + self.take_to_wake().signal(); + } + DISCONNECTED => {} + n => { + assert!(n >= 0); + } + } + } + + pub fn drop_port(&self) { + // Dropping a port seems like a fairly trivial thing. In theory all we + // need to do is flag that we're disconnected and then everything else + // can take over (we don't have anyone to wake up). + // + // The catch for Ports is that we want to drop the entire contents of + // the queue. There are multiple reasons for having this property, the + // largest of which is that if another chan is waiting in this channel + // (but not received yet), then waiting on that port will cause a + // deadlock. + // + // So if we accept that we must now destroy the entire contents of the + // queue, this code may make a bit more sense. The tricky part is that + // we can't let any in-flight sends go un-dropped, we have to make sure + // *everything* is dropped and nothing new will come onto the channel. + + // The first thing we do is set a flag saying that we're done for. All + // sends are gated on this flag, so we're immediately guaranteed that + // there are a bounded number of active sends that we'll have to deal + // with. + self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst); + + // Now that we're guaranteed to deal with a bounded number of senders, + // we need to drain the queue. This draining process happens atomically + // with respect to the "count" of the channel. If the count is nonzero + // (with steals taken into account), then there must be data on the + // channel. In this case we drain everything and then try again. We will + // continue to fail while active senders send data while we're dropping + // data, but eventually we're guaranteed to break out of this loop + // (because there is a bounded number of senders). + let mut steals = unsafe { *self.queue.consumer_addition().steals.get() }; + while { + let cnt = self.queue.producer_addition().cnt.compare_and_swap( + steals, + DISCONNECTED, + Ordering::SeqCst, + ); + cnt != DISCONNECTED && cnt != steals + } { + while self.queue.pop().is_some() { + steals += 1; + } + } + + // At this point in time, we have gated all future senders from sending, + // and we have flagged the channel as being disconnected. The senders + // still have some responsibility, however, because some sends may not + // complete until after we flag the disconnection. There are more + // details in the sending methods that see DISCONNECTED + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // increment the count on the channel (used for selection) + fn bump(&self, amt: isize) -> isize { + match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) { + DISCONNECTED => { + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); + DISCONNECTED + } + n => n, + } + } + + // Removes a previous thread from being blocked in this port + pub fn abort_selection(&self, was_upgrade: bool) -> Result<bool, Receiver<T>> { + // If we're aborting selection after upgrading from a oneshot, then + // we're guarantee that no one is waiting. The only way that we could + // have seen the upgrade is if data was actually sent on the channel + // half again. For us, this means that there is guaranteed to be data on + // this channel. Furthermore, we're guaranteed that there was no + // start_selection previously, so there's no need to modify `self.cnt` + // at all. + // + // Hence, because of these invariants, we immediately return `Ok(true)`. + // Note that the data may not actually be sent on the channel just yet. + // The other end could have flagged the upgrade but not sent data to + // this end. This is fine because we know it's a small bounded windows + // of time until the data is actually sent. + if was_upgrade { + assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); + return Ok(true); + } + + // We want to make sure that the count on the channel goes non-negative, + // and in the stream case we can have at most one steal, so just assume + // that we had one steal. + let steals = 1; + let prev = self.bump(steals + 1); + + // If we were previously disconnected, then we know for sure that there + // is no thread in to_wake, so just keep going + let has_data = if prev == DISCONNECTED { + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); + true // there is data, that data is that we're disconnected + } else { + let cur = prev + steals + 1; + assert!(cur >= 0); + + // If the previous count was negative, then we just made things go + // positive, hence we passed the -1 boundary and we're responsible + // for removing the to_wake() field and trashing it. + // + // If the previous count was positive then we're in a tougher + // situation. A possible race is that a sender just incremented + // through -1 (meaning it's going to try to wake a thread up), but it + // hasn't yet read the to_wake. In order to prevent a future recv() + // from waking up too early (this sender picking up the plastered + // over to_wake), we spin loop here waiting for to_wake to be 0. + // Note that this entire select() implementation needs an overhaul, + // and this is *not* the worst part of it, so this is not done as a + // final solution but rather out of necessity for now to get + // something working. + if prev < 0 { + drop(self.take_to_wake()); + } else { + while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 { + thread::yield_now(); + } + } + unsafe { + assert_eq!(*self.queue.consumer_addition().steals.get(), 0); + *self.queue.consumer_addition().steals.get() = steals; + } + + // if we were previously positive, then there's surely data to + // receive + prev >= 0 + }; + + // Now that we've determined that this queue "has data", we peek at the + // queue to see if the data is an upgrade or not. If it's an upgrade, + // then we need to destroy this port and abort selection on the + // upgraded port. + if has_data { + match self.queue.peek() { + Some(&mut GoUp(..)) => match self.queue.pop() { + Some(GoUp(port)) => Err(port), + _ => unreachable!(), + }, + _ => Ok(true), + } + } else { + Ok(false) + } + } +} + +impl<T> Drop for Packet<T> { + fn drop(&mut self) { + // Note that this load is not only an assert for correctness about + // disconnection, but also a proper fence before the read of + // `to_wake`, so this assert cannot be removed with also removing + // the `to_wake` assert. + assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); + } +} diff --git a/library/std/src/sync/mpsc/sync.rs b/library/std/src/sync/mpsc/sync.rs new file mode 100644 index 00000000000..733761671a0 --- /dev/null +++ b/library/std/src/sync/mpsc/sync.rs @@ -0,0 +1,495 @@ +use self::Blocker::*; +/// Synchronous channels/ports +/// +/// This channel implementation differs significantly from the asynchronous +/// implementations found next to it (oneshot/stream/share). This is an +/// implementation of a synchronous, bounded buffer channel. +/// +/// Each channel is created with some amount of backing buffer, and sends will +/// *block* until buffer space becomes available. A buffer size of 0 is valid, +/// which means that every successful send is paired with a successful recv. +/// +/// This flavor of channels defines a new `send_opt` method for channels which +/// is the method by which a message is sent but the thread does not panic if it +/// cannot be delivered. +/// +/// Another major difference is that send() will *always* return back the data +/// if it couldn't be sent. This is because it is deterministically known when +/// the data is received and when it is not received. +/// +/// Implementation-wise, it can all be summed up with "use a mutex plus some +/// logic". The mutex used here is an OS native mutex, meaning that no user code +/// is run inside of the mutex (to prevent context switching). This +/// implementation shares almost all code for the buffered and unbuffered cases +/// of a synchronous channel. There are a few branches for the unbuffered case, +/// but they're mostly just relevant to blocking senders. +pub use self::Failure::*; + +use core::intrinsics::abort; +use core::mem; +use core::ptr; + +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken}; +use crate::sync::{Mutex, MutexGuard}; +use crate::time::Instant; + +const MAX_REFCOUNT: usize = (isize::MAX) as usize; + +pub struct Packet<T> { + /// Only field outside of the mutex. Just done for kicks, but mainly because + /// the other shared channel already had the code implemented + channels: AtomicUsize, + + lock: Mutex<State<T>>, +} + +unsafe impl<T: Send> Send for Packet<T> {} + +unsafe impl<T: Send> Sync for Packet<T> {} + +struct State<T> { + disconnected: bool, // Is the channel disconnected yet? + queue: Queue, // queue of senders waiting to send data + blocker: Blocker, // currently blocked thread on this channel + buf: Buffer<T>, // storage for buffered messages + cap: usize, // capacity of this channel + + /// A curious flag used to indicate whether a sender failed or succeeded in + /// blocking. This is used to transmit information back to the thread that it + /// must dequeue its message from the buffer because it was not received. + /// This is only relevant in the 0-buffer case. This obviously cannot be + /// safely constructed, but it's guaranteed to always have a valid pointer + /// value. + canceled: Option<&'static mut bool>, +} + +unsafe impl<T: Send> Send for State<T> {} + +/// Possible flavors of threads who can be blocked on this channel. +enum Blocker { + BlockedSender(SignalToken), + BlockedReceiver(SignalToken), + NoneBlocked, +} + +/// Simple queue for threading threads together. Nodes are stack-allocated, so +/// this structure is not safe at all +struct Queue { + head: *mut Node, + tail: *mut Node, +} + +struct Node { + token: Option<SignalToken>, + next: *mut Node, +} + +unsafe impl Send for Node {} + +/// A simple ring-buffer +struct Buffer<T> { + buf: Vec<Option<T>>, + start: usize, + size: usize, +} + +#[derive(Debug)] +pub enum Failure { + Empty, + Disconnected, +} + +/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock` +/// in the meantime. This re-locks the mutex upon returning. +fn wait<'a, 'b, T>( + lock: &'a Mutex<State<T>>, + mut guard: MutexGuard<'b, State<T>>, + f: fn(SignalToken) -> Blocker, +) -> MutexGuard<'a, State<T>> { + let (wait_token, signal_token) = blocking::tokens(); + match mem::replace(&mut guard.blocker, f(signal_token)) { + NoneBlocked => {} + _ => unreachable!(), + } + drop(guard); // unlock + wait_token.wait(); // block + lock.lock().unwrap() // relock +} + +/// Same as wait, but waiting at most until `deadline`. +fn wait_timeout_receiver<'a, 'b, T>( + lock: &'a Mutex<State<T>>, + deadline: Instant, + mut guard: MutexGuard<'b, State<T>>, + success: &mut bool, +) -> MutexGuard<'a, State<T>> { + let (wait_token, signal_token) = blocking::tokens(); + match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) { + NoneBlocked => {} + _ => unreachable!(), + } + drop(guard); // unlock + *success = wait_token.wait_max_until(deadline); // block + let mut new_guard = lock.lock().unwrap(); // relock + if !*success { + abort_selection(&mut new_guard); + } + new_guard +} + +fn abort_selection<T>(guard: &mut MutexGuard<'_, State<T>>) -> bool { + match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => true, + BlockedSender(token) => { + guard.blocker = BlockedSender(token); + true + } + BlockedReceiver(token) => { + drop(token); + false + } + } +} + +/// Wakes up a thread, dropping the lock at the correct time +fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) { + // We need to be careful to wake up the waiting thread *outside* of the mutex + // in case it incurs a context switch. + drop(guard); + token.signal(); +} + +impl<T> Packet<T> { + pub fn new(capacity: usize) -> Packet<T> { + Packet { + channels: AtomicUsize::new(1), + lock: Mutex::new(State { + disconnected: false, + blocker: NoneBlocked, + cap: capacity, + canceled: None, + queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() }, + buf: Buffer { + buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(), + start: 0, + size: 0, + }, + }), + } + } + + // wait until a send slot is available, returning locked access to + // the channel state. + fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> { + let mut node = Node { token: None, next: ptr::null_mut() }; + loop { + let mut guard = self.lock.lock().unwrap(); + // are we ready to go? + if guard.disconnected || guard.buf.size() < guard.buf.capacity() { + return guard; + } + // no room; actually block + let wait_token = guard.queue.enqueue(&mut node); + drop(guard); + wait_token.wait(); + } + } + + pub fn send(&self, t: T) -> Result<(), T> { + let mut guard = self.acquire_send_slot(); + if guard.disconnected { + return Err(t); + } + guard.buf.enqueue(t); + + match mem::replace(&mut guard.blocker, NoneBlocked) { + // if our capacity is 0, then we need to wait for a receiver to be + // available to take our data. After waiting, we check again to make + // sure the port didn't go away in the meantime. If it did, we need + // to hand back our data. + NoneBlocked if guard.cap == 0 => { + let mut canceled = false; + assert!(guard.canceled.is_none()); + guard.canceled = Some(unsafe { mem::transmute(&mut canceled) }); + let mut guard = wait(&self.lock, guard, BlockedSender); + if canceled { Err(guard.buf.dequeue()) } else { Ok(()) } + } + + // success, we buffered some data + NoneBlocked => Ok(()), + + // success, someone's about to receive our buffered data. + BlockedReceiver(token) => { + wakeup(token, guard); + Ok(()) + } + + BlockedSender(..) => panic!("lolwut"), + } + } + + pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> { + let mut guard = self.lock.lock().unwrap(); + if guard.disconnected { + Err(super::TrySendError::Disconnected(t)) + } else if guard.buf.size() == guard.buf.capacity() { + Err(super::TrySendError::Full(t)) + } else if guard.cap == 0 { + // With capacity 0, even though we have buffer space we can't + // transfer the data unless there's a receiver waiting. + match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => Err(super::TrySendError::Full(t)), + BlockedSender(..) => unreachable!(), + BlockedReceiver(token) => { + guard.buf.enqueue(t); + wakeup(token, guard); + Ok(()) + } + } + } else { + // If the buffer has some space and the capacity isn't 0, then we + // just enqueue the data for later retrieval, ensuring to wake up + // any blocked receiver if there is one. + assert!(guard.buf.size() < guard.buf.capacity()); + guard.buf.enqueue(t); + match mem::replace(&mut guard.blocker, NoneBlocked) { + BlockedReceiver(token) => wakeup(token, guard), + NoneBlocked => {} + BlockedSender(..) => unreachable!(), + } + Ok(()) + } + } + + // Receives a message from this channel + // + // When reading this, remember that there can only ever be one receiver at + // time. + pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> { + let mut guard = self.lock.lock().unwrap(); + + let mut woke_up_after_waiting = false; + // Wait for the buffer to have something in it. No need for a + // while loop because we're the only receiver. + if !guard.disconnected && guard.buf.size() == 0 { + if let Some(deadline) = deadline { + guard = + wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting); + } else { + guard = wait(&self.lock, guard, BlockedReceiver); + woke_up_after_waiting = true; + } + } + + // N.B., channel could be disconnected while waiting, so the order of + // these conditionals is important. + if guard.disconnected && guard.buf.size() == 0 { + return Err(Disconnected); + } + + // Pick up the data, wake up our neighbors, and carry on + assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting)); + + if guard.buf.size() == 0 { + return Err(Empty); + } + + let ret = guard.buf.dequeue(); + self.wakeup_senders(woke_up_after_waiting, guard); + Ok(ret) + } + + pub fn try_recv(&self) -> Result<T, Failure> { + let mut guard = self.lock.lock().unwrap(); + + // Easy cases first + if guard.disconnected && guard.buf.size() == 0 { + return Err(Disconnected); + } + if guard.buf.size() == 0 { + return Err(Empty); + } + + // Be sure to wake up neighbors + let ret = Ok(guard.buf.dequeue()); + self.wakeup_senders(false, guard); + ret + } + + // Wake up pending senders after some data has been received + // + // * `waited` - flag if the receiver blocked to receive some data, or if it + // just picked up some data on the way out + // * `guard` - the lock guard that is held over this channel's lock + fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) { + let pending_sender1: Option<SignalToken> = guard.queue.dequeue(); + + // If this is a no-buffer channel (cap == 0), then if we didn't wait we + // need to ACK the sender. If we waited, then the sender waking us up + // was already the ACK. + let pending_sender2 = if guard.cap == 0 && !waited { + match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => None, + BlockedReceiver(..) => unreachable!(), + BlockedSender(token) => { + guard.canceled.take(); + Some(token) + } + } + } else { + None + }; + mem::drop(guard); + + // only outside of the lock do we wake up the pending threads + if let Some(token) = pending_sender1 { + token.signal(); + } + if let Some(token) = pending_sender2 { + token.signal(); + } + } + + // Prepares this shared packet for a channel clone, essentially just bumping + // a refcount. + pub fn clone_chan(&self) { + let old_count = self.channels.fetch_add(1, Ordering::SeqCst); + + // See comments on Arc::clone() on why we do this (for `mem::forget`). + if old_count > MAX_REFCOUNT { + abort(); + } + } + + pub fn drop_chan(&self) { + // Only flag the channel as disconnected if we're the last channel + match self.channels.fetch_sub(1, Ordering::SeqCst) { + 1 => {} + _ => return, + } + + // Not much to do other than wake up a receiver if one's there + let mut guard = self.lock.lock().unwrap(); + if guard.disconnected { + return; + } + guard.disconnected = true; + match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => {} + BlockedSender(..) => unreachable!(), + BlockedReceiver(token) => wakeup(token, guard), + } + } + + pub fn drop_port(&self) { + let mut guard = self.lock.lock().unwrap(); + + if guard.disconnected { + return; + } + guard.disconnected = true; + + // If the capacity is 0, then the sender may want its data back after + // we're disconnected. Otherwise it's now our responsibility to destroy + // the buffered data. As with many other portions of this code, this + // needs to be careful to destroy the data *outside* of the lock to + // prevent deadlock. + let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() }; + let mut queue = + mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() }); + + let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => None, + BlockedSender(token) => { + *guard.canceled.take().unwrap() = true; + Some(token) + } + BlockedReceiver(..) => unreachable!(), + }; + mem::drop(guard); + + while let Some(token) = queue.dequeue() { + token.signal(); + } + if let Some(token) = waiter { + token.signal(); + } + } +} + +impl<T> Drop for Packet<T> { + fn drop(&mut self) { + assert_eq!(self.channels.load(Ordering::SeqCst), 0); + let mut guard = self.lock.lock().unwrap(); + assert!(guard.queue.dequeue().is_none()); + assert!(guard.canceled.is_none()); + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Buffer, a simple ring buffer backed by Vec<T> +//////////////////////////////////////////////////////////////////////////////// + +impl<T> Buffer<T> { + fn enqueue(&mut self, t: T) { + let pos = (self.start + self.size) % self.buf.len(); + self.size += 1; + let prev = mem::replace(&mut self.buf[pos], Some(t)); + assert!(prev.is_none()); + } + + fn dequeue(&mut self) -> T { + let start = self.start; + self.size -= 1; + self.start = (self.start + 1) % self.buf.len(); + let result = &mut self.buf[start]; + result.take().unwrap() + } + + fn size(&self) -> usize { + self.size + } + fn capacity(&self) -> usize { + self.buf.len() + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Queue, a simple queue to enqueue threads with (stack-allocated nodes) +//////////////////////////////////////////////////////////////////////////////// + +impl Queue { + fn enqueue(&mut self, node: &mut Node) -> WaitToken { + let (wait_token, signal_token) = blocking::tokens(); + node.token = Some(signal_token); + node.next = ptr::null_mut(); + + if self.tail.is_null() { + self.head = node as *mut Node; + self.tail = node as *mut Node; + } else { + unsafe { + (*self.tail).next = node as *mut Node; + self.tail = node as *mut Node; + } + } + + wait_token + } + + fn dequeue(&mut self) -> Option<SignalToken> { + if self.head.is_null() { + return None; + } + let node = self.head; + self.head = unsafe { (*node).next }; + if self.head.is_null() { + self.tail = ptr::null_mut(); + } + unsafe { + (*node).next = ptr::null_mut(); + Some((*node).token.take().unwrap()) + } + } +} diff --git a/library/std/src/sync/mutex.rs b/library/std/src/sync/mutex.rs new file mode 100644 index 00000000000..8478457eabf --- /dev/null +++ b/library/std/src/sync/mutex.rs @@ -0,0 +1,767 @@ +use crate::cell::UnsafeCell; +use crate::fmt; +use crate::mem; +use crate::ops::{Deref, DerefMut}; +use crate::ptr; +use crate::sys_common::mutex as sys; +use crate::sys_common::poison::{self, LockResult, TryLockError, TryLockResult}; + +/// A mutual exclusion primitive useful for protecting shared data +/// +/// This mutex will block threads waiting for the lock to become available. The +/// mutex can also be statically initialized or created via a [`new`] +/// constructor. Each mutex has a type parameter which represents the data that +/// it is protecting. The data can only be accessed through the RAII guards +/// returned from [`lock`] and [`try_lock`], which guarantees that the data is only +/// ever accessed when the mutex is locked. +/// +/// # Poisoning +/// +/// The mutexes in this module implement a strategy called "poisoning" where a +/// mutex is considered poisoned whenever a thread panics while holding the +/// mutex. Once a mutex is poisoned, all other threads are unable to access the +/// data by default as it is likely tainted (some invariant is not being +/// upheld). +/// +/// For a mutex, this means that the [`lock`] and [`try_lock`] methods return a +/// [`Result`] which indicates whether a mutex has been poisoned or not. Most +/// usage of a mutex will simply [`unwrap()`] these results, propagating panics +/// among threads to ensure that a possibly invalid invariant is not witnessed. +/// +/// A poisoned mutex, however, does not prevent all access to the underlying +/// data. The [`PoisonError`] type has an [`into_inner`] method which will return +/// the guard that would have otherwise been returned on a successful lock. This +/// allows access to the data, despite the lock being poisoned. +/// +/// [`new`]: #method.new +/// [`lock`]: #method.lock +/// [`try_lock`]: #method.try_lock +/// [`Result`]: ../../std/result/enum.Result.html +/// [`unwrap()`]: ../../std/result/enum.Result.html#method.unwrap +/// [`PoisonError`]: ../../std/sync/struct.PoisonError.html +/// [`into_inner`]: ../../std/sync/struct.PoisonError.html#method.into_inner +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Arc, Mutex}; +/// use std::thread; +/// use std::sync::mpsc::channel; +/// +/// const N: usize = 10; +/// +/// // Spawn a few threads to increment a shared variable (non-atomically), and +/// // let the main thread know once all increments are done. +/// // +/// // Here we're using an Arc to share memory among threads, and the data inside +/// // the Arc is protected with a mutex. +/// let data = Arc::new(Mutex::new(0)); +/// +/// let (tx, rx) = channel(); +/// for _ in 0..N { +/// let (data, tx) = (Arc::clone(&data), tx.clone()); +/// thread::spawn(move || { +/// // The shared state can only be accessed once the lock is held. +/// // Our non-atomic increment is safe because we're the only thread +/// // which can access the shared state when the lock is held. +/// // +/// // We unwrap() the return value to assert that we are not expecting +/// // threads to ever fail while holding the lock. +/// let mut data = data.lock().unwrap(); +/// *data += 1; +/// if *data == N { +/// tx.send(()).unwrap(); +/// } +/// // the lock is unlocked here when `data` goes out of scope. +/// }); +/// } +/// +/// rx.recv().unwrap(); +/// ``` +/// +/// To recover from a poisoned mutex: +/// +/// ``` +/// use std::sync::{Arc, Mutex}; +/// use std::thread; +/// +/// let lock = Arc::new(Mutex::new(0_u32)); +/// let lock2 = lock.clone(); +/// +/// let _ = thread::spawn(move || -> () { +/// // This thread will acquire the mutex first, unwrapping the result of +/// // `lock` because the lock has not been poisoned. +/// let _guard = lock2.lock().unwrap(); +/// +/// // This panic while holding the lock (`_guard` is in scope) will poison +/// // the mutex. +/// panic!(); +/// }).join(); +/// +/// // The lock is poisoned by this point, but the returned result can be +/// // pattern matched on to return the underlying guard on both branches. +/// let mut guard = match lock.lock() { +/// Ok(guard) => guard, +/// Err(poisoned) => poisoned.into_inner(), +/// }; +/// +/// *guard += 1; +/// ``` +/// +/// It is sometimes necessary to manually drop the mutex guard to unlock it +/// sooner than the end of the enclosing scope. +/// +/// ``` +/// use std::sync::{Arc, Mutex}; +/// use std::thread; +/// +/// const N: usize = 3; +/// +/// let data_mutex = Arc::new(Mutex::new(vec![1, 2, 3, 4])); +/// let res_mutex = Arc::new(Mutex::new(0)); +/// +/// let mut threads = Vec::with_capacity(N); +/// (0..N).for_each(|_| { +/// let data_mutex_clone = Arc::clone(&data_mutex); +/// let res_mutex_clone = Arc::clone(&res_mutex); +/// +/// threads.push(thread::spawn(move || { +/// let mut data = data_mutex_clone.lock().unwrap(); +/// // This is the result of some important and long-ish work. +/// let result = data.iter().fold(0, |acc, x| acc + x * 2); +/// data.push(result); +/// drop(data); +/// *res_mutex_clone.lock().unwrap() += result; +/// })); +/// }); +/// +/// let mut data = data_mutex.lock().unwrap(); +/// // This is the result of some important and long-ish work. +/// let result = data.iter().fold(0, |acc, x| acc + x * 2); +/// data.push(result); +/// // We drop the `data` explicitly because it's not necessary anymore and the +/// // thread still has work to do. This allow other threads to start working on +/// // the data immediately, without waiting for the rest of the unrelated work +/// // to be done here. +/// // +/// // It's even more important here than in the threads because we `.join` the +/// // threads after that. If we had not dropped the mutex guard, a thread could +/// // be waiting forever for it, causing a deadlock. +/// drop(data); +/// // Here the mutex guard is not assigned to a variable and so, even if the +/// // scope does not end after this line, the mutex is still released: there is +/// // no deadlock. +/// *res_mutex.lock().unwrap() += result; +/// +/// threads.into_iter().for_each(|thread| { +/// thread +/// .join() +/// .expect("The thread creating or execution failed !") +/// }); +/// +/// assert_eq!(*res_mutex.lock().unwrap(), 800); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +#[cfg_attr(not(test), rustc_diagnostic_item = "mutex_type")] +pub struct Mutex<T: ?Sized> { + // Note that this mutex is in a *box*, not inlined into the struct itself. + // Once a native mutex has been used once, its address can never change (it + // can't be moved). This mutex type can be safely moved at any time, so to + // ensure that the native mutex is used correctly we box the inner mutex to + // give it a constant address. + inner: Box<sys::Mutex>, + poison: poison::Flag, + data: UnsafeCell<T>, +} + +// these are the only places where `T: Send` matters; all other +// functionality works fine on a single thread. +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl<T: ?Sized + Send> Send for Mutex<T> {} +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +/// +/// The data protected by the mutex can be accessed through this guard via its +/// [`Deref`] and [`DerefMut`] implementations. +/// +/// This structure is created by the [`lock`] and [`try_lock`] methods on +/// [`Mutex`]. +/// +/// [`Deref`]: ../../std/ops/trait.Deref.html +/// [`DerefMut`]: ../../std/ops/trait.DerefMut.html +/// [`lock`]: struct.Mutex.html#method.lock +/// [`try_lock`]: struct.Mutex.html#method.try_lock +/// [`Mutex`]: struct.Mutex.html +#[must_use = "if unused the Mutex will immediately unlock"] +#[stable(feature = "rust1", since = "1.0.0")] +pub struct MutexGuard<'a, T: ?Sized + 'a> { + lock: &'a Mutex<T>, + poison: poison::Guard, +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized> !Send for MutexGuard<'_, T> {} +#[stable(feature = "mutexguard", since = "1.19.0")] +unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {} + +impl<T> Mutex<T> { + /// Creates a new mutex in an unlocked state ready for use. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Mutex; + /// + /// let mutex = Mutex::new(0); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn new(t: T) -> Mutex<T> { + let mut m = Mutex { + inner: box sys::Mutex::new(), + poison: poison::Flag::new(), + data: UnsafeCell::new(t), + }; + unsafe { + m.inner.init(); + } + m + } +} + +impl<T: ?Sized> Mutex<T> { + /// Acquires a mutex, blocking the current thread until it is able to do so. + /// + /// This function will block the local thread until it is available to acquire + /// the mutex. Upon returning, the thread is the only thread with the lock + /// held. An RAII guard is returned to allow scoped unlock of the lock. When + /// the guard goes out of scope, the mutex will be unlocked. + /// + /// The exact behavior on locking a mutex in the thread which already holds + /// the lock is left unspecified. However, this function will not return on + /// the second call (it might panic or deadlock, for example). + /// + /// # Errors + /// + /// If another user of this mutex panicked while holding the mutex, then + /// this call will return an error once the mutex is acquired. + /// + /// # Panics + /// + /// This function might panic when called if the lock is already held by + /// the current thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex}; + /// use std::thread; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = mutex.clone(); + /// + /// thread::spawn(move || { + /// *c_mutex.lock().unwrap() = 10; + /// }).join().expect("thread::spawn failed"); + /// assert_eq!(*mutex.lock().unwrap(), 10); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn lock(&self) -> LockResult<MutexGuard<'_, T>> { + unsafe { + self.inner.raw_lock(); + MutexGuard::new(self) + } + } + + /// Attempts to acquire this lock. + /// + /// If the lock could not be acquired at this time, then [`Err`] is returned. + /// Otherwise, an RAII guard is returned. The lock will be unlocked when the + /// guard is dropped. + /// + /// This function does not block. + /// + /// # Errors + /// + /// If another user of this mutex panicked while holding the mutex, then + /// this call will return failure if the mutex would otherwise be + /// acquired. + /// + /// [`Err`]: ../../std/result/enum.Result.html#variant.Err + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex}; + /// use std::thread; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = mutex.clone(); + /// + /// thread::spawn(move || { + /// let mut lock = c_mutex.try_lock(); + /// if let Ok(ref mut mutex) = lock { + /// **mutex = 10; + /// } else { + /// println!("try_lock failed"); + /// } + /// }).join().expect("thread::spawn failed"); + /// assert_eq!(*mutex.lock().unwrap(), 10); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> { + unsafe { + if self.inner.try_lock() { + Ok(MutexGuard::new(self)?) + } else { + Err(TryLockError::WouldBlock) + } + } + } + + /// Determines whether the mutex is poisoned. + /// + /// If another thread is active, the mutex can still become poisoned at any + /// time. You should not trust a `false` value for program correctness + /// without additional synchronization. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex}; + /// use std::thread; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = mutex.clone(); + /// + /// let _ = thread::spawn(move || { + /// let _lock = c_mutex.lock().unwrap(); + /// panic!(); // the mutex gets poisoned + /// }).join(); + /// assert_eq!(mutex.is_poisoned(), true); + /// ``` + #[inline] + #[stable(feature = "sync_poison", since = "1.2.0")] + pub fn is_poisoned(&self) -> bool { + self.poison.get() + } + + /// Consumes this mutex, returning the underlying data. + /// + /// # Errors + /// + /// If another user of this mutex panicked while holding the mutex, then + /// this call will return an error instead. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Mutex; + /// + /// let mutex = Mutex::new(0); + /// assert_eq!(mutex.into_inner().unwrap(), 0); + /// ``` + #[stable(feature = "mutex_into_inner", since = "1.6.0")] + pub fn into_inner(self) -> LockResult<T> + where + T: Sized, + { + // We know statically that there are no outstanding references to + // `self` so there's no need to lock the inner mutex. + // + // To get the inner value, we'd like to call `data.into_inner()`, + // but because `Mutex` impl-s `Drop`, we can't move out of it, so + // we'll have to destructure it manually instead. + unsafe { + // Like `let Mutex { inner, poison, data } = self`. + let (inner, poison, data) = { + let Mutex { ref inner, ref poison, ref data } = self; + (ptr::read(inner), ptr::read(poison), ptr::read(data)) + }; + mem::forget(self); + inner.destroy(); // Keep in sync with the `Drop` impl. + drop(inner); + + poison::map_result(poison.borrow(), |_| data.into_inner()) + } + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `Mutex` mutably, no actual locking needs to + /// take place -- the mutable borrow statically guarantees no locks exist. + /// + /// # Errors + /// + /// If another user of this mutex panicked while holding the mutex, then + /// this call will return an error instead. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Mutex; + /// + /// let mut mutex = Mutex::new(0); + /// *mutex.get_mut().unwrap() = 10; + /// assert_eq!(*mutex.lock().unwrap(), 10); + /// ``` + #[stable(feature = "mutex_get_mut", since = "1.6.0")] + pub fn get_mut(&mut self) -> LockResult<&mut T> { + // We know statically that there are no other references to `self`, so + // there's no need to lock the inner mutex. + let data = unsafe { &mut *self.data.get() }; + poison::map_result(self.poison.borrow(), |_| data) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl<#[may_dangle] T: ?Sized> Drop for Mutex<T> { + fn drop(&mut self) { + // This is actually safe b/c we know that there is no further usage of + // this mutex (it's up to the user to arrange for a mutex to get + // dropped, that's not our job) + // + // IMPORTANT: This code must be kept in sync with `Mutex::into_inner`. + unsafe { self.inner.destroy() } + } +} + +#[stable(feature = "mutex_from", since = "1.24.0")] +impl<T> From<T> for Mutex<T> { + /// Creates a new mutex in an unlocked state ready for use. + /// This is equivalent to [`Mutex::new`]. + /// + /// [`Mutex::new`]: ../../std/sync/struct.Mutex.html#method.new + fn from(t: T) -> Self { + Mutex::new(t) + } +} + +#[stable(feature = "mutex_default", since = "1.10.0")] +impl<T: ?Sized + Default> Default for Mutex<T> { + /// Creates a `Mutex<T>`, with the `Default` value for T. + fn default() -> Mutex<T> { + Mutex::new(Default::default()) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized + fmt::Debug> fmt::Debug for Mutex<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.try_lock() { + Ok(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(), + Err(TryLockError::Poisoned(err)) => { + f.debug_struct("Mutex").field("data", &&**err.get_ref()).finish() + } + Err(TryLockError::WouldBlock) => { + struct LockedPlaceholder; + impl fmt::Debug for LockedPlaceholder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("<locked>") + } + } + + f.debug_struct("Mutex").field("data", &LockedPlaceholder).finish() + } + } + } +} + +impl<'mutex, T: ?Sized> MutexGuard<'mutex, T> { + unsafe fn new(lock: &'mutex Mutex<T>) -> LockResult<MutexGuard<'mutex, T>> { + poison::map_result(lock.poison.borrow(), |guard| MutexGuard { lock, poison: guard }) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized> Deref for MutexGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.data.get() } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized> DerefMut for MutexGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.data.get() } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized> Drop for MutexGuard<'_, T> { + #[inline] + fn drop(&mut self) { + unsafe { + self.lock.poison.done(&self.poison); + self.lock.inner.raw_unlock(); + } + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +#[stable(feature = "std_guard_impls", since = "1.20.0")] +impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +pub fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a sys::Mutex { + &guard.lock.inner +} + +pub fn guard_poison<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a poison::Flag { + &guard.lock.poison +} + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests { + use crate::sync::atomic::{AtomicUsize, Ordering}; + use crate::sync::mpsc::channel; + use crate::sync::{Arc, Condvar, Mutex}; + use crate::thread; + + struct Packet<T>(Arc<(Mutex<T>, Condvar)>); + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + #[test] + fn smoke() { + let m = Mutex::new(()); + drop(m.lock().unwrap()); + drop(m.lock().unwrap()); + } + + #[test] + fn lots_and_lots() { + const J: u32 = 1000; + const K: u32 = 3; + + let m = Arc::new(Mutex::new(0)); + + fn inc(m: &Mutex<u32>) { + for _ in 0..J { + *m.lock().unwrap() += 1; + } + } + + let (tx, rx) = channel(); + for _ in 0..K { + let tx2 = tx.clone(); + let m2 = m.clone(); + thread::spawn(move || { + inc(&m2); + tx2.send(()).unwrap(); + }); + let tx2 = tx.clone(); + let m2 = m.clone(); + thread::spawn(move || { + inc(&m2); + tx2.send(()).unwrap(); + }); + } + + drop(tx); + for _ in 0..2 * K { + rx.recv().unwrap(); + } + assert_eq!(*m.lock().unwrap(), J * K * 2); + } + + #[test] + fn try_lock() { + let m = Mutex::new(()); + *m.try_lock().unwrap() = (); + } + + #[test] + fn test_into_inner() { + let m = Mutex::new(NonCopy(10)); + assert_eq!(m.into_inner().unwrap(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc<AtomicUsize>); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = Mutex::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner().unwrap(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_into_inner_poison() { + let m = Arc::new(Mutex::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.lock().unwrap(); + panic!("test panic in inner thread to poison mutex"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().into_inner() { + Err(e) => assert_eq!(e.into_inner(), NonCopy(10)), + Ok(x) => panic!("into_inner of poisoned Mutex is Ok: {:?}", x), + } + } + + #[test] + fn test_get_mut() { + let mut m = Mutex::new(NonCopy(10)); + *m.get_mut().unwrap() = NonCopy(20); + assert_eq!(m.into_inner().unwrap(), NonCopy(20)); + } + + #[test] + fn test_get_mut_poison() { + let m = Arc::new(Mutex::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.lock().unwrap(); + panic!("test panic in inner thread to poison mutex"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().get_mut() { + Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)), + Ok(x) => panic!("get_mut of poisoned Mutex is Ok: {:?}", x), + } + } + + #[test] + fn test_mutex_arc_condvar() { + let packet = Packet(Arc::new((Mutex::new(false), Condvar::new()))); + let packet2 = Packet(packet.0.clone()); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + // wait until parent gets in + rx.recv().unwrap(); + let &(ref lock, ref cvar) = &*packet2.0; + let mut lock = lock.lock().unwrap(); + *lock = true; + cvar.notify_one(); + }); + + let &(ref lock, ref cvar) = &*packet.0; + let mut lock = lock.lock().unwrap(); + tx.send(()).unwrap(); + assert!(!*lock); + while !*lock { + lock = cvar.wait(lock).unwrap(); + } + } + + #[test] + fn test_arc_condvar_poison() { + let packet = Packet(Arc::new((Mutex::new(1), Condvar::new()))); + let packet2 = Packet(packet.0.clone()); + let (tx, rx) = channel(); + + let _t = thread::spawn(move || -> () { + rx.recv().unwrap(); + let &(ref lock, ref cvar) = &*packet2.0; + let _g = lock.lock().unwrap(); + cvar.notify_one(); + // Parent should fail when it wakes up. + panic!(); + }); + + let &(ref lock, ref cvar) = &*packet.0; + let mut lock = lock.lock().unwrap(); + tx.send(()).unwrap(); + while *lock == 1 { + match cvar.wait(lock) { + Ok(l) => { + lock = l; + assert_eq!(*lock, 1); + } + Err(..) => break, + } + } + } + + #[test] + fn test_mutex_arc_poison() { + let arc = Arc::new(Mutex::new(1)); + assert!(!arc.is_poisoned()); + let arc2 = arc.clone(); + let _ = thread::spawn(move || { + let lock = arc2.lock().unwrap(); + assert_eq!(*lock, 2); + }) + .join(); + assert!(arc.lock().is_err()); + assert!(arc.is_poisoned()); + } + + #[test] + fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlying data. + let arc = Arc::new(Mutex::new(1)); + let arc2 = Arc::new(Mutex::new(arc)); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let lock = arc2.lock().unwrap(); + let lock2 = lock.lock().unwrap(); + assert_eq!(*lock2, 1); + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); + } + + #[test] + fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(Mutex::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc<Mutex<i32>>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + *self.i.lock().unwrap() += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.lock().unwrap(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_mutex_unsized() { + let mutex: &Mutex<[i32]> = &Mutex::new([1, 2, 3]); + { + let b = &mut *mutex.lock().unwrap(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*mutex.lock().unwrap(), comp); + } +} diff --git a/library/std/src/sync/once.rs b/library/std/src/sync/once.rs new file mode 100644 index 00000000000..64260990824 --- /dev/null +++ b/library/std/src/sync/once.rs @@ -0,0 +1,690 @@ +//! A "once initialization" primitive +//! +//! This primitive is meant to be used to run one-time initialization. An +//! example use case would be for initializing an FFI library. + +// A "once" is a relatively simple primitive, and it's also typically provided +// by the OS as well (see `pthread_once` or `InitOnceExecuteOnce`). The OS +// primitives, however, tend to have surprising restrictions, such as the Unix +// one doesn't allow an argument to be passed to the function. +// +// As a result, we end up implementing it ourselves in the standard library. +// This also gives us the opportunity to optimize the implementation a bit which +// should help the fast path on call sites. Consequently, let's explain how this +// primitive works now! +// +// So to recap, the guarantees of a Once are that it will call the +// initialization closure at most once, and it will never return until the one +// that's running has finished running. This means that we need some form of +// blocking here while the custom callback is running at the very least. +// Additionally, we add on the restriction of **poisoning**. Whenever an +// initialization closure panics, the Once enters a "poisoned" state which means +// that all future calls will immediately panic as well. +// +// So to implement this, one might first reach for a `Mutex`, but those cannot +// be put into a `static`. It also gets a lot harder with poisoning to figure +// out when the mutex needs to be deallocated because it's not after the closure +// finishes, but after the first successful closure finishes. +// +// All in all, this is instead implemented with atomics and lock-free +// operations! Whee! Each `Once` has one word of atomic state, and this state is +// CAS'd on to determine what to do. There are four possible state of a `Once`: +// +// * Incomplete - no initialization has run yet, and no thread is currently +// using the Once. +// * Poisoned - some thread has previously attempted to initialize the Once, but +// it panicked, so the Once is now poisoned. There are no other +// threads currently accessing this Once. +// * Running - some thread is currently attempting to run initialization. It may +// succeed, so all future threads need to wait for it to finish. +// Note that this state is accompanied with a payload, described +// below. +// * Complete - initialization has completed and all future calls should finish +// immediately. +// +// With 4 states we need 2 bits to encode this, and we use the remaining bits +// in the word we have allocated as a queue of threads waiting for the thread +// responsible for entering the RUNNING state. This queue is just a linked list +// of Waiter nodes which is monotonically increasing in size. Each node is +// allocated on the stack, and whenever the running closure finishes it will +// consume the entire queue and notify all waiters they should try again. +// +// You'll find a few more details in the implementation, but that's the gist of +// it! +// +// Atomic orderings: +// When running `Once` we deal with multiple atomics: +// `Once.state_and_queue` and an unknown number of `Waiter.signaled`. +// * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the +// result of the `Once`, and (3) for synchronizing `Waiter` nodes. +// - At the end of the `call_inner` function we have to make sure the result +// of the `Once` is acquired. So every load which can be the only one to +// load COMPLETED must have at least Acquire ordering, which means all +// three of them. +// - `WaiterQueue::Drop` is the only place that may store COMPLETED, and +// must do so with Release ordering to make the result available. +// - `wait` inserts `Waiter` nodes as a pointer in `state_and_queue`, and +// needs to make the nodes available with Release ordering. The load in +// its `compare_and_swap` can be Relaxed because it only has to compare +// the atomic, not to read other data. +// - `WaiterQueue::Drop` must see the `Waiter` nodes, so it must load +// `state_and_queue` with Acquire ordering. +// - There is just one store where `state_and_queue` is used only as a +// state flag, without having to synchronize data: switching the state +// from INCOMPLETE to RUNNING in `call_inner`. This store can be Relaxed, +// but the read has to be Acquire because of the requirements mentioned +// above. +// * `Waiter.signaled` is both used as a flag, and to protect a field with +// interior mutability in `Waiter`. `Waiter.thread` is changed in +// `WaiterQueue::Drop` which then sets `signaled` with Release ordering. +// After `wait` loads `signaled` with Acquire and sees it is true, it needs to +// see the changes to drop the `Waiter` struct correctly. +// * There is one place where the two atomics `Once.state_and_queue` and +// `Waiter.signaled` come together, and might be reordered by the compiler or +// processor. Because both use Aquire ordering such a reordering is not +// allowed, so no need for SeqCst. + +use crate::cell::Cell; +use crate::fmt; +use crate::marker; +use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use crate::thread::{self, Thread}; + +/// A synchronization primitive which can be used to run a one-time global +/// initialization. Useful for one-time initialization for FFI or related +/// functionality. This type can only be constructed with the [`Once::new`] +/// constructor. +/// +/// [`Once::new`]: struct.Once.html#method.new +/// +/// # Examples +/// +/// ``` +/// use std::sync::Once; +/// +/// static START: Once = Once::new(); +/// +/// START.call_once(|| { +/// // run initialization here +/// }); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct Once { + // `state_and_queue` is actually an a pointer to a `Waiter` with extra state + // bits, so we add the `PhantomData` appropriately. + state_and_queue: AtomicUsize, + _marker: marker::PhantomData<*const Waiter>, +} + +// The `PhantomData` of a raw pointer removes these two auto traits, but we +// enforce both below in the implementation so this should be safe to add. +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Sync for Once {} +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Send for Once {} + +/// State yielded to [`call_once_force`]’s closure parameter. The state can be +/// used to query the poison status of the [`Once`]. +/// +/// [`call_once_force`]: struct.Once.html#method.call_once_force +/// [`Once`]: struct.Once.html +#[unstable(feature = "once_poison", issue = "33577")] +#[derive(Debug)] +pub struct OnceState { + poisoned: bool, + set_state_on_drop_to: Cell<usize>, +} + +/// Initialization value for static [`Once`] values. +/// +/// [`Once`]: struct.Once.html +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Once, ONCE_INIT}; +/// +/// static START: Once = ONCE_INIT; +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +#[rustc_deprecated( + since = "1.38.0", + reason = "the `new` function is now preferred", + suggestion = "Once::new()" +)] +pub const ONCE_INIT: Once = Once::new(); + +// Four states that a Once can be in, encoded into the lower bits of +// `state_and_queue` in the Once structure. +const INCOMPLETE: usize = 0x0; +const POISONED: usize = 0x1; +const RUNNING: usize = 0x2; +const COMPLETE: usize = 0x3; + +// Mask to learn about the state. All other bits are the queue of waiters if +// this is in the RUNNING state. +const STATE_MASK: usize = 0x3; + +// Representation of a node in the linked list of waiters, used while in the +// RUNNING state. +// Note: `Waiter` can't hold a mutable pointer to the next thread, because then +// `wait` would both hand out a mutable reference to its `Waiter` node, and keep +// a shared reference to check `signaled`. Instead we hold shared references and +// use interior mutability. +#[repr(align(4))] // Ensure the two lower bits are free to use as state bits. +struct Waiter { + thread: Cell<Option<Thread>>, + signaled: AtomicBool, + next: *const Waiter, +} + +// Head of a linked list of waiters. +// Every node is a struct on the stack of a waiting thread. +// Will wake up the waiters when it gets dropped, i.e. also on panic. +struct WaiterQueue<'a> { + state_and_queue: &'a AtomicUsize, + set_state_on_drop_to: usize, +} + +impl Once { + /// Creates a new `Once` value. + #[stable(feature = "once_new", since = "1.2.0")] + #[rustc_const_stable(feature = "const_once_new", since = "1.32.0")] + pub const fn new() -> Once { + Once { state_and_queue: AtomicUsize::new(INCOMPLETE), _marker: marker::PhantomData } + } + + /// Performs an initialization routine once and only once. The given closure + /// will be executed if this is the first time `call_once` has been called, + /// and otherwise the routine will *not* be invoked. + /// + /// This method will block the calling thread if another initialization + /// routine is currently running. + /// + /// When this function returns, it is guaranteed that some initialization + /// has run and completed (it may not be the closure specified). It is also + /// guaranteed that any memory writes performed by the executed closure can + /// be reliably observed by other threads at this point (there is a + /// happens-before relation between the closure and code executing after the + /// return). + /// + /// If the given closure recursively invokes `call_once` on the same `Once` + /// instance the exact behavior is not specified, allowed outcomes are + /// a panic or a deadlock. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Once; + /// + /// static mut VAL: usize = 0; + /// static INIT: Once = Once::new(); + /// + /// // Accessing a `static mut` is unsafe much of the time, but if we do so + /// // in a synchronized fashion (e.g., write once or read all) then we're + /// // good to go! + /// // + /// // This function will only call `expensive_computation` once, and will + /// // otherwise always return the value returned from the first invocation. + /// fn get_cached_val() -> usize { + /// unsafe { + /// INIT.call_once(|| { + /// VAL = expensive_computation(); + /// }); + /// VAL + /// } + /// } + /// + /// fn expensive_computation() -> usize { + /// // ... + /// # 2 + /// } + /// ``` + /// + /// # Panics + /// + /// The closure `f` will only be executed once if this is called + /// concurrently amongst many threads. If that closure panics, however, then + /// it will *poison* this `Once` instance, causing all future invocations of + /// `call_once` to also panic. + /// + /// This is similar to [poisoning with mutexes][poison]. + /// + /// [poison]: struct.Mutex.html#poisoning + #[stable(feature = "rust1", since = "1.0.0")] + pub fn call_once<F>(&self, f: F) + where + F: FnOnce(), + { + // Fast path check + if self.is_completed() { + return; + } + + let mut f = Some(f); + self.call_inner(false, &mut |_| f.take().unwrap()()); + } + + /// Performs the same function as [`call_once`] except ignores poisoning. + /// + /// Unlike [`call_once`], if this `Once` has been poisoned (i.e., a previous + /// call to `call_once` or `call_once_force` caused a panic), calling + /// `call_once_force` will still invoke the closure `f` and will _not_ + /// result in an immediate panic. If `f` panics, the `Once` will remain + /// in a poison state. If `f` does _not_ panic, the `Once` will no + /// longer be in a poison state and all future calls to `call_once` or + /// `call_once_force` will be no-ops. + /// + /// The closure `f` is yielded a [`OnceState`] structure which can be used + /// to query the poison status of the `Once`. + /// + /// [`call_once`]: struct.Once.html#method.call_once + /// [`OnceState`]: struct.OnceState.html + /// + /// # Examples + /// + /// ``` + /// #![feature(once_poison)] + /// + /// use std::sync::Once; + /// use std::thread; + /// + /// static INIT: Once = Once::new(); + /// + /// // poison the once + /// let handle = thread::spawn(|| { + /// INIT.call_once(|| panic!()); + /// }); + /// assert!(handle.join().is_err()); + /// + /// // poisoning propagates + /// let handle = thread::spawn(|| { + /// INIT.call_once(|| {}); + /// }); + /// assert!(handle.join().is_err()); + /// + /// // call_once_force will still run and reset the poisoned state + /// INIT.call_once_force(|state| { + /// assert!(state.poisoned()); + /// }); + /// + /// // once any success happens, we stop propagating the poison + /// INIT.call_once(|| {}); + /// ``` + #[unstable(feature = "once_poison", issue = "33577")] + pub fn call_once_force<F>(&self, f: F) + where + F: FnOnce(&OnceState), + { + // Fast path check + if self.is_completed() { + return; + } + + let mut f = Some(f); + self.call_inner(true, &mut |p| f.take().unwrap()(p)); + } + + /// Returns `true` if some `call_once` call has completed + /// successfully. Specifically, `is_completed` will return false in + /// the following situations: + /// * `call_once` was not called at all, + /// * `call_once` was called, but has not yet completed, + /// * the `Once` instance is poisoned + /// + /// This function returning `false` does not mean that `Once` has not been + /// executed. For example, it may have been executed in the time between + /// when `is_completed` starts executing and when it returns, in which case + /// the `false` return value would be stale (but still permissible). + /// + /// # Examples + /// + /// ``` + /// use std::sync::Once; + /// + /// static INIT: Once = Once::new(); + /// + /// assert_eq!(INIT.is_completed(), false); + /// INIT.call_once(|| { + /// assert_eq!(INIT.is_completed(), false); + /// }); + /// assert_eq!(INIT.is_completed(), true); + /// ``` + /// + /// ``` + /// use std::sync::Once; + /// use std::thread; + /// + /// static INIT: Once = Once::new(); + /// + /// assert_eq!(INIT.is_completed(), false); + /// let handle = thread::spawn(|| { + /// INIT.call_once(|| panic!()); + /// }); + /// assert!(handle.join().is_err()); + /// assert_eq!(INIT.is_completed(), false); + /// ``` + #[stable(feature = "once_is_completed", since = "1.43.0")] + #[inline] + pub fn is_completed(&self) -> bool { + // An `Acquire` load is enough because that makes all the initialization + // operations visible to us, and, this being a fast path, weaker + // ordering helps with performance. This `Acquire` synchronizes with + // `Release` operations on the slow path. + self.state_and_queue.load(Ordering::Acquire) == COMPLETE + } + + // This is a non-generic function to reduce the monomorphization cost of + // using `call_once` (this isn't exactly a trivial or small implementation). + // + // Additionally, this is tagged with `#[cold]` as it should indeed be cold + // and it helps let LLVM know that calls to this function should be off the + // fast path. Essentially, this should help generate more straight line code + // in LLVM. + // + // Finally, this takes an `FnMut` instead of a `FnOnce` because there's + // currently no way to take an `FnOnce` and call it via virtual dispatch + // without some allocation overhead. + #[cold] + fn call_inner(&self, ignore_poisoning: bool, init: &mut dyn FnMut(&OnceState)) { + let mut state_and_queue = self.state_and_queue.load(Ordering::Acquire); + loop { + match state_and_queue { + COMPLETE => break, + POISONED if !ignore_poisoning => { + // Panic to propagate the poison. + panic!("Once instance has previously been poisoned"); + } + POISONED | INCOMPLETE => { + // Try to register this thread as the one RUNNING. + let old = self.state_and_queue.compare_and_swap( + state_and_queue, + RUNNING, + Ordering::Acquire, + ); + if old != state_and_queue { + state_and_queue = old; + continue; + } + // `waiter_queue` will manage other waiting threads, and + // wake them up on drop. + let mut waiter_queue = WaiterQueue { + state_and_queue: &self.state_and_queue, + set_state_on_drop_to: POISONED, + }; + // Run the initialization function, letting it know if we're + // poisoned or not. + let init_state = OnceState { + poisoned: state_and_queue == POISONED, + set_state_on_drop_to: Cell::new(COMPLETE), + }; + init(&init_state); + waiter_queue.set_state_on_drop_to = init_state.set_state_on_drop_to.get(); + break; + } + _ => { + // All other values must be RUNNING with possibly a + // pointer to the waiter queue in the more significant bits. + assert!(state_and_queue & STATE_MASK == RUNNING); + wait(&self.state_and_queue, state_and_queue); + state_and_queue = self.state_and_queue.load(Ordering::Acquire); + } + } + } + } +} + +fn wait(state_and_queue: &AtomicUsize, mut current_state: usize) { + // Note: the following code was carefully written to avoid creating a + // mutable reference to `node` that gets aliased. + loop { + // Don't queue this thread if the status is no longer running, + // otherwise we will not be woken up. + if current_state & STATE_MASK != RUNNING { + return; + } + + // Create the node for our current thread. + let node = Waiter { + thread: Cell::new(Some(thread::current())), + signaled: AtomicBool::new(false), + next: (current_state & !STATE_MASK) as *const Waiter, + }; + let me = &node as *const Waiter as usize; + + // Try to slide in the node at the head of the linked list, making sure + // that another thread didn't just replace the head of the linked list. + let old = state_and_queue.compare_and_swap(current_state, me | RUNNING, Ordering::Release); + if old != current_state { + current_state = old; + continue; + } + + // We have enqueued ourselves, now lets wait. + // It is important not to return before being signaled, otherwise we + // would drop our `Waiter` node and leave a hole in the linked list + // (and a dangling reference). Guard against spurious wakeups by + // reparking ourselves until we are signaled. + while !node.signaled.load(Ordering::Acquire) { + // If the managing thread happens to signal and unpark us before we + // can park ourselves, the result could be this thread never gets + // unparked. Luckily `park` comes with the guarantee that if it got + // an `unpark` just before on an unparked thread is does not park. + thread::park(); + } + break; + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl fmt::Debug for Once { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Once { .. }") + } +} + +impl Drop for WaiterQueue<'_> { + fn drop(&mut self) { + // Swap out our state with however we finished. + let state_and_queue = + self.state_and_queue.swap(self.set_state_on_drop_to, Ordering::AcqRel); + + // We should only ever see an old state which was RUNNING. + assert_eq!(state_and_queue & STATE_MASK, RUNNING); + + // Walk the entire linked list of waiters and wake them up (in lifo + // order, last to register is first to wake up). + unsafe { + // Right after setting `node.signaled = true` the other thread may + // free `node` if there happens to be has a spurious wakeup. + // So we have to take out the `thread` field and copy the pointer to + // `next` first. + let mut queue = (state_and_queue & !STATE_MASK) as *const Waiter; + while !queue.is_null() { + let next = (*queue).next; + let thread = (*queue).thread.take().unwrap(); + (*queue).signaled.store(true, Ordering::Release); + // ^- FIXME (maybe): This is another case of issue #55005 + // `store()` has a potentially dangling ref to `signaled`. + queue = next; + thread.unpark(); + } + } + } +} + +impl OnceState { + /// Returns `true` if the associated [`Once`] was poisoned prior to the + /// invocation of the closure passed to [`call_once_force`]. + /// + /// [`call_once_force`]: struct.Once.html#method.call_once_force + /// [`Once`]: struct.Once.html + /// + /// # Examples + /// + /// A poisoned `Once`: + /// + /// ``` + /// #![feature(once_poison)] + /// + /// use std::sync::Once; + /// use std::thread; + /// + /// static INIT: Once = Once::new(); + /// + /// // poison the once + /// let handle = thread::spawn(|| { + /// INIT.call_once(|| panic!()); + /// }); + /// assert!(handle.join().is_err()); + /// + /// INIT.call_once_force(|state| { + /// assert!(state.poisoned()); + /// }); + /// ``` + /// + /// An unpoisoned `Once`: + /// + /// ``` + /// #![feature(once_poison)] + /// + /// use std::sync::Once; + /// + /// static INIT: Once = Once::new(); + /// + /// INIT.call_once_force(|state| { + /// assert!(!state.poisoned()); + /// }); + #[unstable(feature = "once_poison", issue = "33577")] + pub fn poisoned(&self) -> bool { + self.poisoned + } + + /// Poison the associated [`Once`] without explicitly panicking. + /// + /// [`Once`]: struct.Once.html + // NOTE: This is currently only exposed for the `lazy` module + pub(crate) fn poison(&self) { + self.set_state_on_drop_to.set(POISONED); + } +} + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests { + use super::Once; + use crate::panic; + use crate::sync::mpsc::channel; + use crate::thread; + + #[test] + fn smoke_once() { + static O: Once = Once::new(); + let mut a = 0; + O.call_once(|| a += 1); + assert_eq!(a, 1); + O.call_once(|| a += 1); + assert_eq!(a, 1); + } + + #[test] + fn stampede_once() { + static O: Once = Once::new(); + static mut RUN: bool = false; + + let (tx, rx) = channel(); + for _ in 0..10 { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..4 { + thread::yield_now() + } + unsafe { + O.call_once(|| { + assert!(!RUN); + RUN = true; + }); + assert!(RUN); + } + tx.send(()).unwrap(); + }); + } + + unsafe { + O.call_once(|| { + assert!(!RUN); + RUN = true; + }); + assert!(RUN); + } + + for _ in 0..10 { + rx.recv().unwrap(); + } + } + + #[test] + fn poison_bad() { + static O: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + O.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // poisoning propagates + let t = panic::catch_unwind(|| { + O.call_once(|| {}); + }); + assert!(t.is_err()); + + // we can subvert poisoning, however + let mut called = false; + O.call_once_force(|p| { + called = true; + assert!(p.poisoned()) + }); + assert!(called); + + // once any success happens, we stop propagating the poison + O.call_once(|| {}); + } + + #[test] + fn wait_for_force_to_finish() { + static O: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + O.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // make sure someone's waiting inside the once via a force + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let t1 = thread::spawn(move || { + O.call_once_force(|p| { + assert!(p.poisoned()); + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + }); + }); + + rx1.recv().unwrap(); + + // put another waiter on the once + let t2 = thread::spawn(|| { + let mut called = false; + O.call_once(|| { + called = true; + }); + assert!(!called); + }); + + tx2.send(()).unwrap(); + + assert!(t1.join().is_ok()); + assert!(t2.join().is_ok()); + } +} diff --git a/library/std/src/sync/rwlock.rs b/library/std/src/sync/rwlock.rs new file mode 100644 index 00000000000..50f54dbf143 --- /dev/null +++ b/library/std/src/sync/rwlock.rs @@ -0,0 +1,799 @@ +use crate::cell::UnsafeCell; +use crate::fmt; +use crate::mem; +use crate::ops::{Deref, DerefMut}; +use crate::ptr; +use crate::sys_common::poison::{self, LockResult, TryLockError, TryLockResult}; +use crate::sys_common::rwlock as sys; + +/// A reader-writer lock +/// +/// This type of lock allows a number of readers or at most one writer at any +/// point in time. The write portion of this lock typically allows modification +/// of the underlying data (exclusive access) and the read portion of this lock +/// typically allows for read-only access (shared access). +/// +/// In comparison, a [`Mutex`] does not distinguish between readers or writers +/// that acquire the lock, therefore blocking any threads waiting for the lock to +/// become available. An `RwLock` will allow any number of readers to acquire the +/// lock as long as a writer is not holding the lock. +/// +/// The priority policy of the lock is dependent on the underlying operating +/// system's implementation, and this type does not guarantee that any +/// particular policy will be used. +/// +/// The type parameter `T` represents the data that this lock protects. It is +/// required that `T` satisfies [`Send`] to be shared across threads and +/// [`Sync`] to allow concurrent access through readers. The RAII guards +/// returned from the locking methods implement [`Deref`] (and [`DerefMut`] +/// for the `write` methods) to allow access to the content of the lock. +/// +/// # Poisoning +/// +/// An `RwLock`, like [`Mutex`], will become poisoned on a panic. Note, however, +/// that an `RwLock` may only be poisoned if a panic occurs while it is locked +/// exclusively (write mode). If a panic occurs in any reader, then the lock +/// will not be poisoned. +/// +/// # Examples +/// +/// ``` +/// use std::sync::RwLock; +/// +/// let lock = RwLock::new(5); +/// +/// // many reader locks can be held at once +/// { +/// let r1 = lock.read().unwrap(); +/// let r2 = lock.read().unwrap(); +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// } // read locks are dropped at this point +/// +/// // only one write lock may be held, however +/// { +/// let mut w = lock.write().unwrap(); +/// *w += 1; +/// assert_eq!(*w, 6); +/// } // write lock is dropped here +/// ``` +/// +/// [`Deref`]: ../../std/ops/trait.Deref.html +/// [`DerefMut`]: ../../std/ops/trait.DerefMut.html +/// [`Send`]: ../../std/marker/trait.Send.html +/// [`Sync`]: ../../std/marker/trait.Sync.html +/// [`Mutex`]: struct.Mutex.html +#[stable(feature = "rust1", since = "1.0.0")] +pub struct RwLock<T: ?Sized> { + inner: Box<sys::RWLock>, + poison: poison::Flag, + data: UnsafeCell<T>, +} + +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl<T: ?Sized + Send> Send for RwLock<T> {} +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {} + +/// RAII structure used to release the shared read access of a lock when +/// dropped. +/// +/// This structure is created by the [`read`] and [`try_read`] methods on +/// [`RwLock`]. +/// +/// [`read`]: struct.RwLock.html#method.read +/// [`try_read`]: struct.RwLock.html#method.try_read +/// [`RwLock`]: struct.RwLock.html +#[must_use = "if unused the RwLock will immediately unlock"] +#[stable(feature = "rust1", since = "1.0.0")] +pub struct RwLockReadGuard<'a, T: ?Sized + 'a> { + lock: &'a RwLock<T>, +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized> !Send for RwLockReadGuard<'_, T> {} + +#[stable(feature = "rwlock_guard_sync", since = "1.23.0")] +unsafe impl<T: ?Sized + Sync> Sync for RwLockReadGuard<'_, T> {} + +/// RAII structure used to release the exclusive write access of a lock when +/// dropped. +/// +/// This structure is created by the [`write`] and [`try_write`] methods +/// on [`RwLock`]. +/// +/// [`write`]: struct.RwLock.html#method.write +/// [`try_write`]: struct.RwLock.html#method.try_write +/// [`RwLock`]: struct.RwLock.html +#[must_use = "if unused the RwLock will immediately unlock"] +#[stable(feature = "rust1", since = "1.0.0")] +pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> { + lock: &'a RwLock<T>, + poison: poison::Guard, +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized> !Send for RwLockWriteGuard<'_, T> {} + +#[stable(feature = "rwlock_guard_sync", since = "1.23.0")] +unsafe impl<T: ?Sized + Sync> Sync for RwLockWriteGuard<'_, T> {} + +impl<T> RwLock<T> { + /// Creates a new instance of an `RwLock<T>` which is unlocked. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(5); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn new(t: T) -> RwLock<T> { + RwLock { + inner: box sys::RWLock::new(), + poison: poison::Flag::new(), + data: UnsafeCell::new(t), + } + } +} + +impl<T: ?Sized> RwLock<T> { + /// Locks this rwlock with shared read access, blocking the current thread + /// until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which + /// hold the lock. There may be other readers currently inside the lock when + /// this method returns. This method does not provide any guarantees with + /// respect to the ordering of whether contentious readers or writers will + /// acquire the lock first. + /// + /// Returns an RAII guard which will release this thread's shared access + /// once it is dropped. + /// + /// # Errors + /// + /// This function will return an error if the RwLock is poisoned. An RwLock + /// is poisoned whenever a writer panics while holding an exclusive lock. + /// The failure will occur immediately after the lock has been acquired. + /// + /// # Panics + /// + /// This function might panic when called if the lock is already held by the current thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, RwLock}; + /// use std::thread; + /// + /// let lock = Arc::new(RwLock::new(1)); + /// let c_lock = lock.clone(); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// thread::spawn(move || { + /// let r = c_lock.read(); + /// assert!(r.is_ok()); + /// }).join().unwrap(); + /// ``` + #[inline] + #[stable(feature = "rust1", since = "1.0.0")] + pub fn read(&self) -> LockResult<RwLockReadGuard<'_, T>> { + unsafe { + self.inner.read(); + RwLockReadGuard::new(self) + } + } + + /// Attempts to acquire this rwlock with shared read access. + /// + /// If the access could not be granted at this time, then `Err` is returned. + /// Otherwise, an RAII guard is returned which will release the shared access + /// when it is dropped. + /// + /// This function does not block. + /// + /// This function does not provide any guarantees with respect to the ordering + /// of whether contentious readers or writers will acquire the lock first. + /// + /// # Errors + /// + /// This function will return an error if the RwLock is poisoned. An RwLock + /// is poisoned whenever a writer panics while holding an exclusive lock. An + /// error will only be returned if the lock would have otherwise been + /// acquired. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// match lock.try_read() { + /// Ok(n) => assert_eq!(*n, 1), + /// Err(_) => unreachable!(), + /// }; + /// ``` + #[inline] + #[stable(feature = "rust1", since = "1.0.0")] + pub fn try_read(&self) -> TryLockResult<RwLockReadGuard<'_, T>> { + unsafe { + if self.inner.try_read() { + Ok(RwLockReadGuard::new(self)?) + } else { + Err(TryLockError::WouldBlock) + } + } + } + + /// Locks this rwlock with exclusive write access, blocking the current + /// thread until it can be acquired. + /// + /// This function will not return while other writers or other readers + /// currently have access to the lock. + /// + /// Returns an RAII guard which will drop the write access of this rwlock + /// when dropped. + /// + /// # Errors + /// + /// This function will return an error if the RwLock is poisoned. An RwLock + /// is poisoned whenever a writer panics while holding an exclusive lock. + /// An error will be returned when the lock is acquired. + /// + /// # Panics + /// + /// This function might panic when called if the lock is already held by the current thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// let mut n = lock.write().unwrap(); + /// *n = 2; + /// + /// assert!(lock.try_read().is_err()); + /// ``` + #[inline] + #[stable(feature = "rust1", since = "1.0.0")] + pub fn write(&self) -> LockResult<RwLockWriteGuard<'_, T>> { + unsafe { + self.inner.write(); + RwLockWriteGuard::new(self) + } + } + + /// Attempts to lock this rwlock with exclusive write access. + /// + /// If the lock could not be acquired at this time, then `Err` is returned. + /// Otherwise, an RAII guard is returned which will release the lock when + /// it is dropped. + /// + /// This function does not block. + /// + /// This function does not provide any guarantees with respect to the ordering + /// of whether contentious readers or writers will acquire the lock first. + /// + /// # Errors + /// + /// This function will return an error if the RwLock is poisoned. An RwLock + /// is poisoned whenever a writer panics while holding an exclusive lock. An + /// error will only be returned if the lock would have otherwise been + /// acquired. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// assert!(lock.try_write().is_err()); + /// ``` + #[inline] + #[stable(feature = "rust1", since = "1.0.0")] + pub fn try_write(&self) -> TryLockResult<RwLockWriteGuard<'_, T>> { + unsafe { + if self.inner.try_write() { + Ok(RwLockWriteGuard::new(self)?) + } else { + Err(TryLockError::WouldBlock) + } + } + } + + /// Determines whether the lock is poisoned. + /// + /// If another thread is active, the lock can still become poisoned at any + /// time. You should not trust a `false` value for program correctness + /// without additional synchronization. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, RwLock}; + /// use std::thread; + /// + /// let lock = Arc::new(RwLock::new(0)); + /// let c_lock = lock.clone(); + /// + /// let _ = thread::spawn(move || { + /// let _lock = c_lock.write().unwrap(); + /// panic!(); // the lock gets poisoned + /// }).join(); + /// assert_eq!(lock.is_poisoned(), true); + /// ``` + #[inline] + #[stable(feature = "sync_poison", since = "1.2.0")] + pub fn is_poisoned(&self) -> bool { + self.poison.get() + } + + /// Consumes this `RwLock`, returning the underlying data. + /// + /// # Errors + /// + /// This function will return an error if the RwLock is poisoned. An RwLock + /// is poisoned whenever a writer panics while holding an exclusive lock. An + /// error will only be returned if the lock would have otherwise been + /// acquired. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(String::new()); + /// { + /// let mut s = lock.write().unwrap(); + /// *s = "modified".to_owned(); + /// } + /// assert_eq!(lock.into_inner().unwrap(), "modified"); + /// ``` + #[stable(feature = "rwlock_into_inner", since = "1.6.0")] + pub fn into_inner(self) -> LockResult<T> + where + T: Sized, + { + // We know statically that there are no outstanding references to + // `self` so there's no need to lock the inner lock. + // + // To get the inner value, we'd like to call `data.into_inner()`, + // but because `RwLock` impl-s `Drop`, we can't move out of it, so + // we'll have to destructure it manually instead. + unsafe { + // Like `let RwLock { inner, poison, data } = self`. + let (inner, poison, data) = { + let RwLock { ref inner, ref poison, ref data } = self; + (ptr::read(inner), ptr::read(poison), ptr::read(data)) + }; + mem::forget(self); + inner.destroy(); // Keep in sync with the `Drop` impl. + drop(inner); + + poison::map_result(poison.borrow(), |_| data.into_inner()) + } + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `RwLock` mutably, no actual locking needs to + /// take place -- the mutable borrow statically guarantees no locks exist. + /// + /// # Errors + /// + /// This function will return an error if the RwLock is poisoned. An RwLock + /// is poisoned whenever a writer panics while holding an exclusive lock. An + /// error will only be returned if the lock would have otherwise been + /// acquired. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let mut lock = RwLock::new(0); + /// *lock.get_mut().unwrap() = 10; + /// assert_eq!(*lock.read().unwrap(), 10); + /// ``` + #[stable(feature = "rwlock_get_mut", since = "1.6.0")] + pub fn get_mut(&mut self) -> LockResult<&mut T> { + // We know statically that there are no other references to `self`, so + // there's no need to lock the inner lock. + let data = unsafe { &mut *self.data.get() }; + poison::map_result(self.poison.borrow(), |_| data) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl<#[may_dangle] T: ?Sized> Drop for RwLock<T> { + fn drop(&mut self) { + // IMPORTANT: This code needs to be kept in sync with `RwLock::into_inner`. + unsafe { self.inner.destroy() } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLock<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.try_read() { + Ok(guard) => f.debug_struct("RwLock").field("data", &&*guard).finish(), + Err(TryLockError::Poisoned(err)) => { + f.debug_struct("RwLock").field("data", &&**err.get_ref()).finish() + } + Err(TryLockError::WouldBlock) => { + struct LockedPlaceholder; + impl fmt::Debug for LockedPlaceholder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("<locked>") + } + } + + f.debug_struct("RwLock").field("data", &LockedPlaceholder).finish() + } + } + } +} + +#[stable(feature = "rw_lock_default", since = "1.10.0")] +impl<T: Default> Default for RwLock<T> { + /// Creates a new `RwLock<T>`, with the `Default` value for T. + fn default() -> RwLock<T> { + RwLock::new(Default::default()) + } +} + +#[stable(feature = "rw_lock_from", since = "1.24.0")] +impl<T> From<T> for RwLock<T> { + /// Creates a new instance of an `RwLock<T>` which is unlocked. + /// This is equivalent to [`RwLock::new`]. + /// + /// [`RwLock::new`]: ../../std/sync/struct.RwLock.html#method.new + fn from(t: T) -> Self { + RwLock::new(t) + } +} + +impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> { + unsafe fn new(lock: &'rwlock RwLock<T>) -> LockResult<RwLockReadGuard<'rwlock, T>> { + poison::map_result(lock.poison.borrow(), |_| RwLockReadGuard { lock }) + } +} + +impl<'rwlock, T: ?Sized> RwLockWriteGuard<'rwlock, T> { + unsafe fn new(lock: &'rwlock RwLock<T>) -> LockResult<RwLockWriteGuard<'rwlock, T>> { + poison::map_result(lock.poison.borrow(), |guard| RwLockWriteGuard { lock, poison: guard }) + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl<T: fmt::Debug> fmt::Debug for RwLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RwLockReadGuard").field("lock", &self.lock).finish() + } +} + +#[stable(feature = "std_guard_impls", since = "1.20.0")] +impl<T: ?Sized + fmt::Display> fmt::Display for RwLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl<T: fmt::Debug> fmt::Debug for RwLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RwLockWriteGuard").field("lock", &self.lock).finish() + } +} + +#[stable(feature = "std_guard_impls", since = "1.20.0")] +impl<T: ?Sized + fmt::Display> fmt::Display for RwLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.data.get() } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.data.get() } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.data.get() } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> { + fn drop(&mut self) { + unsafe { + self.lock.inner.read_unlock(); + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> { + fn drop(&mut self) { + self.lock.poison.done(&self.poison); + unsafe { + self.lock.inner.write_unlock(); + } + } +} + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests { + use crate::sync::atomic::{AtomicUsize, Ordering}; + use crate::sync::mpsc::channel; + use crate::sync::{Arc, RwLock, TryLockError}; + use crate::thread; + use rand::{self, Rng}; + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + #[test] + fn smoke() { + let l = RwLock::new(()); + drop(l.read().unwrap()); + drop(l.write().unwrap()); + drop((l.read().unwrap(), l.read().unwrap())); + drop(l.write().unwrap()); + } + + #[test] + fn frob() { + const N: u32 = 10; + const M: usize = 1000; + + let r = Arc::new(RwLock::new(())); + + let (tx, rx) = channel::<()>(); + for _ in 0..N { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let mut rng = rand::thread_rng(); + for _ in 0..M { + if rng.gen_bool(1.0 / (N as f64)) { + drop(r.write().unwrap()); + } else { + drop(r.read().unwrap()); + } + } + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); + } + + #[test] + fn test_rw_arc_poison_wr() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.write().unwrap(); + panic!(); + }) + .join(); + assert!(arc.read().is_err()); + } + + #[test] + fn test_rw_arc_poison_ww() { + let arc = Arc::new(RwLock::new(1)); + assert!(!arc.is_poisoned()); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.write().unwrap(); + panic!(); + }) + .join(); + assert!(arc.write().is_err()); + assert!(arc.is_poisoned()); + } + + #[test] + fn test_rw_arc_no_poison_rr() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.read().unwrap(); + panic!(); + }) + .join(); + let lock = arc.read().unwrap(); + assert_eq!(*lock, 1); + } + #[test] + fn test_rw_arc_no_poison_rw() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.read().unwrap(); + panic!() + }) + .join(); + let lock = arc.write().unwrap(); + assert_eq!(*lock, 1); + } + + #[test] + fn test_rw_arc() { + let arc = Arc::new(RwLock::new(0)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + + thread::spawn(move || { + let mut lock = arc2.write().unwrap(); + for _ in 0..10 { + let tmp = *lock; + *lock = -1; + thread::yield_now(); + *lock = tmp + 1; + } + tx.send(()).unwrap(); + }); + + // Readers try to catch the writer in the act + let mut children = Vec::new(); + for _ in 0..5 { + let arc3 = arc.clone(); + children.push(thread::spawn(move || { + let lock = arc3.read().unwrap(); + assert!(*lock >= 0); + })); + } + + // Wait for children to pass their asserts + for r in children { + assert!(r.join().is_ok()); + } + + // Wait for writer to finish + rx.recv().unwrap(); + let lock = arc.read().unwrap(); + assert_eq!(*lock, 10); + } + + #[test] + fn test_rw_arc_access_in_unwind() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc<RwLock<isize>>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + let mut lock = self.i.write().unwrap(); + *lock += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.read().unwrap(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_rwlock_unsized() { + let rw: &RwLock<[i32]> = &RwLock::new([1, 2, 3]); + { + let b = &mut *rw.write().unwrap(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*rw.read().unwrap(), comp); + } + + #[test] + fn test_rwlock_try_write() { + let lock = RwLock::new(0isize); + let read_guard = lock.read().unwrap(); + + let write_result = lock.try_write(); + match write_result { + Err(TryLockError::WouldBlock) => (), + Ok(_) => assert!(false, "try_write should not succeed while read_guard is in scope"), + Err(_) => assert!(false, "unexpected error"), + } + + drop(read_guard); + } + + #[test] + fn test_into_inner() { + let m = RwLock::new(NonCopy(10)); + assert_eq!(m.into_inner().unwrap(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc<AtomicUsize>); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = RwLock::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner().unwrap(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_into_inner_poison() { + let m = Arc::new(RwLock::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.write().unwrap(); + panic!("test panic in inner thread to poison RwLock"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().into_inner() { + Err(e) => assert_eq!(e.into_inner(), NonCopy(10)), + Ok(x) => panic!("into_inner of poisoned RwLock is Ok: {:?}", x), + } + } + + #[test] + fn test_get_mut() { + let mut m = RwLock::new(NonCopy(10)); + *m.get_mut().unwrap() = NonCopy(20); + assert_eq!(m.into_inner().unwrap(), NonCopy(20)); + } + + #[test] + fn test_get_mut_poison() { + let m = Arc::new(RwLock::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.write().unwrap(); + panic!("test panic in inner thread to poison RwLock"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().get_mut() { + Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)), + Ok(x) => panic!("get_mut of poisoned RwLock is Ok: {:?}", x), + } + } +} |
