crossbeam-epoch/src/sync/queue.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Michael-Scott lock-free queue. |
2 | | //! |
3 | | //! Usable with any number of producers and consumers. |
4 | | //! |
5 | | //! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue |
6 | | //! Algorithms. PODC 1996. <http://dl.acm.org/citation.cfm?id=248106> |
7 | | //! |
8 | | //! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a |
9 | | //! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7> |
10 | | |
11 | | use core::mem::MaybeUninit; |
12 | | use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; |
13 | | |
14 | | use crossbeam_utils::CachePadded; |
15 | | |
16 | | use crate::{unprotected, Atomic, Guard, Owned, Shared}; |
17 | | |
18 | | // The representation here is a singly-linked list, with a sentinel node at the front. In general |
19 | | // the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or |
20 | | // all `Blocked` (requests for data from blocked threads). |
21 | 0 | #[derive(Debug)] |
22 | | pub(crate) struct Queue<T> { |
23 | | head: CachePadded<Atomic<Node<T>>>, |
24 | | tail: CachePadded<Atomic<Node<T>>>, |
25 | | } |
26 | | |
27 | | struct Node<T> { |
28 | | /// The slot in which a value of type `T` can be stored. |
29 | | /// |
30 | | /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`. |
31 | | /// For example, the sentinel node in a queue never contains a value: its slot is always empty. |
32 | | /// Other nodes start their life with a push operation and contain a value until it gets popped |
33 | | /// out. After that such empty nodes get added to the collector for destruction. |
34 | | data: MaybeUninit<T>, |
35 | | |
36 | | next: Atomic<Node<T>>, |
37 | | } |
38 | | |
39 | | // Any particular `T` should never be accessed concurrently, so no need for `Sync`. |
40 | | unsafe impl<T: Send> Sync for Queue<T> {} |
41 | | unsafe impl<T: Send> Send for Queue<T> {} |
42 | | |
43 | | impl<T> Queue<T> { |
44 | | /// Create a new, empty queue. |
45 | 70 | pub(crate) fn new() -> Queue<T> { |
46 | 70 | let q = Queue { |
47 | 70 | head: CachePadded::new(Atomic::null()), |
48 | 70 | tail: CachePadded::new(Atomic::null()), |
49 | 70 | }; |
50 | 70 | let sentinel = Owned::new(Node { |
51 | 70 | data: MaybeUninit::uninit(), |
52 | 70 | next: Atomic::null(), |
53 | 70 | }); |
54 | 70 | unsafe { |
55 | 70 | let guard = unprotected(); |
56 | 70 | let sentinel = sentinel.into_shared(guard); |
57 | 70 | q.head.store(sentinel, Relaxed); |
58 | 70 | q.tail.store(sentinel, Relaxed); |
59 | 70 | q |
60 | 70 | } |
61 | 70 | } <crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::new Line | Count | Source | 45 | 43 | pub(crate) fn new() -> Queue<T> { | 46 | 43 | let q = Queue { | 47 | 43 | head: CachePadded::new(Atomic::null()), | 48 | 43 | tail: CachePadded::new(Atomic::null()), | 49 | 43 | }; | 50 | 43 | let sentinel = Owned::new(Node { | 51 | 43 | data: MaybeUninit::uninit(), | 52 | 43 | next: Atomic::null(), | 53 | 43 | }); | 54 | 43 | unsafe { | 55 | 43 | let guard = unprotected(); | 56 | 43 | let sentinel = sentinel.into_shared(guard); | 57 | 43 | q.head.store(sentinel, Relaxed); | 58 | 43 | q.tail.store(sentinel, Relaxed); | 59 | 43 | q | 60 | 43 | } | 61 | 43 | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::new Line | Count | Source | 45 | 16 | pub(crate) fn new() -> Queue<T> { | 46 | 16 | let q = Queue { | 47 | 16 | head: CachePadded::new(Atomic::null()), | 48 | 16 | tail: CachePadded::new(Atomic::null()), | 49 | 16 | }; | 50 | 16 | let sentinel = Owned::new(Node { | 51 | 16 | data: MaybeUninit::uninit(), | 52 | 16 | next: Atomic::null(), | 53 | 16 | }); | 54 | 16 | unsafe { | 55 | 16 | let guard = unprotected(); | 56 | 16 | let sentinel = sentinel.into_shared(guard); | 57 | 16 | q.head.store(sentinel, Relaxed); | 58 | 16 | q.tail.store(sentinel, Relaxed); | 59 | 16 | q | 60 | 16 | } | 61 | 16 | } |
<crossbeam_epoch::sync::queue::Queue<i64>>::new Line | Count | Source | 45 | 10 | pub(crate) fn new() -> Queue<T> { | 46 | 10 | let q = Queue { | 47 | 10 | head: CachePadded::new(Atomic::null()), | 48 | 10 | tail: CachePadded::new(Atomic::null()), | 49 | 10 | }; | 50 | 10 | let sentinel = Owned::new(Node { | 51 | 10 | data: MaybeUninit::uninit(), | 52 | 10 | next: Atomic::null(), | 53 | 10 | }); | 54 | 10 | unsafe { | 55 | 10 | let guard = unprotected(); | 56 | 10 | let sentinel = sentinel.into_shared(guard); | 57 | 10 | q.head.store(sentinel, Relaxed); | 58 | 10 | q.tail.store(sentinel, Relaxed); | 59 | 10 | q | 60 | 10 | } | 61 | 10 | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::new Line | Count | Source | 45 | 1 | pub(crate) fn new() -> Queue<T> { | 46 | 1 | let q = Queue { | 47 | 1 | head: CachePadded::new(Atomic::null()), | 48 | 1 | tail: CachePadded::new(Atomic::null()), | 49 | 1 | }; | 50 | 1 | let sentinel = Owned::new(Node { | 51 | 1 | data: MaybeUninit::uninit(), | 52 | 1 | next: Atomic::null(), | 53 | 1 | }); | 54 | 1 | unsafe { | 55 | 1 | let guard = unprotected(); | 56 | 1 | let sentinel = sentinel.into_shared(guard); | 57 | 1 | q.head.store(sentinel, Relaxed); | 58 | 1 | q.tail.store(sentinel, Relaxed); | 59 | 1 | q | 60 | 1 | } | 61 | 1 | } |
|
62 | | |
63 | | /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on |
64 | | /// success. The queue's `tail` pointer may be updated. |
65 | | #[inline(always)] |
66 | 2.88M | fn push_internal( |
67 | 2.88M | &self, |
68 | 2.88M | onto: Shared<'_, Node<T>>, |
69 | 2.88M | new: Shared<'_, Node<T>>, |
70 | 2.88M | guard: &Guard, |
71 | 2.88M | ) -> bool { |
72 | 2.88M | // is `onto` the actual tail? |
73 | 2.88M | let o = unsafe { onto.deref() }; |
74 | 2.88M | let next = o.next.load(Acquire, guard); |
75 | 2.88M | if unsafe { next.as_ref().is_some() } { |
76 | | // if not, try to "help" by moving the tail pointer forward |
77 | 331 | let _ = self |
78 | 331 | .tail |
79 | 331 | .compare_exchange(onto, next, Release, Relaxed, guard); |
80 | 331 | false |
81 | | } else { |
82 | | // looks like the actual tail; attempt to link in `n` |
83 | 2.88M | let result = o |
84 | 2.88M | .next |
85 | 2.88M | .compare_exchange(Shared::null(), new, Release, Relaxed, guard) |
86 | 2.88M | .is_ok(); |
87 | 2.92M | if result { |
88 | 2.92M | // try to move the tail pointer forward |
89 | 2.92M | let _ = self |
90 | 2.92M | .tail |
91 | 2.92M | .compare_exchange(onto, new, Release, Relaxed, guard); |
92 | 2.92M | }267 |
93 | 2.92M | result |
94 | | } |
95 | 2.92M | } <crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::push_internal Line | Count | Source | 66 | 236 | fn push_internal( | 67 | 236 | &self, | 68 | 236 | onto: Shared<'_, Node<T>>, | 69 | 236 | new: Shared<'_, Node<T>>, | 70 | 236 | guard: &Guard, | 71 | 236 | ) -> bool { | 72 | 236 | // is `onto` the actual tail? | 73 | 236 | let o = unsafe { onto.deref() }; | 74 | 236 | let next = o.next.load(Acquire, guard); | 75 | 236 | if unsafe { next.as_ref().is_some() } { | 76 | | // if not, try to "help" by moving the tail pointer forward | 77 | 11 | let _ = self | 78 | 11 | .tail | 79 | 11 | .compare_exchange(onto, next, Release, Relaxed, guard); | 80 | 11 | false | 81 | | } else { | 82 | | // looks like the actual tail; attempt to link in `n` | 83 | 225 | let result = o | 84 | 225 | .next | 85 | 225 | .compare_exchange(Shared::null(), new, Release, Relaxed, guard) | 86 | 225 | .is_ok(); | 87 | 225 | if result { | 88 | 220 | // try to move the tail pointer forward | 89 | 220 | let _ = self | 90 | 220 | .tail | 91 | 220 | .compare_exchange(onto, new, Release, Relaxed, guard); | 92 | 220 | }6 | 93 | 226 | result | 94 | | } | 95 | 237 | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::push_internal Line | Count | Source | 66 | 4 | fn push_internal( | 67 | 4 | &self, | 68 | 4 | onto: Shared<'_, Node<T>>, | 69 | 4 | new: Shared<'_, Node<T>>, | 70 | 4 | guard: &Guard, | 71 | 4 | ) -> bool { | 72 | 4 | // is `onto` the actual tail? | 73 | 4 | let o = unsafe { onto.deref() }; | 74 | 4 | let next = o.next.load(Acquire, guard); | 75 | 4 | if unsafe { next.as_ref().is_some() } { | 76 | | // if not, try to "help" by moving the tail pointer forward | 77 | 0 | let _ = self | 78 | 0 | .tail | 79 | 0 | .compare_exchange(onto, next, Release, Relaxed, guard); | 80 | 0 | false | 81 | | } else { | 82 | | // looks like the actual tail; attempt to link in `n` | 83 | 4 | let result = o | 84 | 4 | .next | 85 | 4 | .compare_exchange(Shared::null(), new, Release, Relaxed, guard) | 86 | 4 | .is_ok(); | 87 | 4 | if result { | 88 | 4 | // try to move the tail pointer forward | 89 | 4 | let _ = self | 90 | 4 | .tail | 91 | 4 | .compare_exchange(onto, new, Release, Relaxed, guard); | 92 | 4 | }0 | 93 | 4 | result | 94 | | } | 95 | 4 | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::push_internal Line | Count | Source | 66 | 65.4k | fn push_internal( | 67 | 65.4k | &self, | 68 | 65.4k | onto: Shared<'_, Node<T>>, | 69 | 65.4k | new: Shared<'_, Node<T>>, | 70 | 65.4k | guard: &Guard, | 71 | 65.4k | ) -> bool { | 72 | 65.4k | // is `onto` the actual tail? | 73 | 65.4k | let o = unsafe { onto.deref() }; | 74 | 65.4k | let next = o.next.load(Acquire, guard); | 75 | 65.4k | if unsafe { next.as_ref().is_some() } { | 76 | | // if not, try to "help" by moving the tail pointer forward | 77 | 320 | let _ = self | 78 | 320 | .tail | 79 | 320 | .compare_exchange(onto, next, Release, Relaxed, guard); | 80 | 320 | false | 81 | | } else { | 82 | | // looks like the actual tail; attempt to link in `n` | 83 | 65.1k | let result = o | 84 | 65.1k | .next | 85 | 65.1k | .compare_exchange(Shared::null(), new, Release, Relaxed, guard) | 86 | 65.1k | .is_ok(); | 87 | 65.1k | if result { | 88 | 64.9k | // try to move the tail pointer forward | 89 | 64.9k | let _ = self | 90 | 64.9k | .tail | 91 | 64.9k | .compare_exchange(onto, new, Release, Relaxed, guard); | 92 | 64.9k | }261 | 93 | 65.2k | result | 94 | | } | 95 | 65.5k | } |
<crossbeam_epoch::sync::queue::Queue<i64>>::push_internal Line | Count | Source | 66 | 2.81M | fn push_internal( | 67 | 2.81M | &self, | 68 | 2.81M | onto: Shared<'_, Node<T>>, | 69 | 2.81M | new: Shared<'_, Node<T>>, | 70 | 2.81M | guard: &Guard, | 71 | 2.81M | ) -> bool { | 72 | 2.81M | // is `onto` the actual tail? | 73 | 2.81M | let o = unsafe { onto.deref() }; | 74 | 2.81M | let next = o.next.load(Acquire, guard); | 75 | 2.81M | if unsafe { next.as_ref().is_some() } { | 76 | | // if not, try to "help" by moving the tail pointer forward | 77 | 0 | let _ = self | 78 | 0 | .tail | 79 | 0 | .compare_exchange(onto, next, Release, Relaxed, guard); | 80 | 0 | false | 81 | | } else { | 82 | | // looks like the actual tail; attempt to link in `n` | 83 | 2.81M | let result = o | 84 | 2.81M | .next | 85 | 2.81M | .compare_exchange(Shared::null(), new, Release, Relaxed, guard) | 86 | 2.81M | .is_ok(); | 87 | 2.85M | if result { | 88 | 2.85M | // try to move the tail pointer forward | 89 | 2.85M | let _ = self | 90 | 2.85M | .tail | 91 | 2.85M | .compare_exchange(onto, new, Release, Relaxed, guard); | 92 | 2.85M | }0 | 93 | 2.85M | result | 94 | | } | 95 | 2.85M | } |
|
96 | | |
97 | | /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`. |
98 | 2.92M | pub(crate) fn push(&self, t: T, guard: &Guard) { |
99 | 2.92M | let new = Owned::new(Node { |
100 | 2.92M | data: MaybeUninit::new(t), |
101 | 2.92M | next: Atomic::null(), |
102 | 2.92M | }); |
103 | 2.92M | let new = Owned::into_shared(new, guard); |
104 | | |
105 | 2.92M | loop { |
106 | 2.92M | // We push onto the tail, so we'll start optimistically by looking there first. |
107 | 2.92M | let tail = self.tail.load(Acquire, guard); |
108 | 2.92M | |
109 | 2.92M | // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. |
110 | 2.92M | if self.push_internal(tail, new, guard) { |
111 | 2.92M | break; |
112 | 598 | } |
113 | | } |
114 | 2.92M | } <crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::push Line | Count | Source | 98 | 219 | pub(crate) fn push(&self, t: T, guard: &Guard) { | 99 | 219 | let new = Owned::new(Node { | 100 | 219 | data: MaybeUninit::new(t), | 101 | 219 | next: Atomic::null(), | 102 | 219 | }); | 103 | 219 | let new = Owned::into_shared(new, guard); | 104 | | | 105 | 236 | loop { | 106 | 236 | // We push onto the tail, so we'll start optimistically by looking there first. | 107 | 236 | let tail = self.tail.load(Acquire, guard); | 108 | 236 | | 109 | 236 | // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. | 110 | 236 | if self.push_internal(tail, new, guard) { | 111 | 219 | break; | 112 | 17 | } | 113 | | } | 114 | 219 | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::push Line | Count | Source | 98 | 64.9k | pub(crate) fn push(&self, t: T, guard: &Guard) { | 99 | 64.9k | let new = Owned::new(Node { | 100 | 64.9k | data: MaybeUninit::new(t), | 101 | 64.9k | next: Atomic::null(), | 102 | 64.9k | }); | 103 | 64.9k | let new = Owned::into_shared(new, guard); | 104 | | | 105 | 65.5k | loop { | 106 | 65.5k | // We push onto the tail, so we'll start optimistically by looking there first. | 107 | 65.5k | let tail = self.tail.load(Acquire, guard); | 108 | 65.5k | | 109 | 65.5k | // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. | 110 | 65.5k | if self.push_internal(tail, new, guard) { | 111 | 64.9k | break; | 112 | 581 | } | 113 | | } | 114 | 64.9k | } |
<crossbeam_epoch::sync::queue::Queue<i64>>::push Line | Count | Source | 98 | 2.86M | pub(crate) fn push(&self, t: T, guard: &Guard) { | 99 | 2.86M | let new = Owned::new(Node { | 100 | 2.86M | data: MaybeUninit::new(t), | 101 | 2.86M | next: Atomic::null(), | 102 | 2.86M | }); | 103 | 2.86M | let new = Owned::into_shared(new, guard); | 104 | | | 105 | 2.86M | loop { | 106 | 2.86M | // We push onto the tail, so we'll start optimistically by looking there first. | 107 | 2.86M | let tail = self.tail.load(Acquire, guard); | 108 | 2.86M | | 109 | 2.86M | // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. | 110 | 2.86M | if self.push_internal(tail, new, guard) { | 111 | 2.86M | break; | 112 | 0 | } | 113 | | } | 114 | 2.86M | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::push Line | Count | Source | 98 | 4 | pub(crate) fn push(&self, t: T, guard: &Guard) { | 99 | 4 | let new = Owned::new(Node { | 100 | 4 | data: MaybeUninit::new(t), | 101 | 4 | next: Atomic::null(), | 102 | 4 | }); | 103 | 4 | let new = Owned::into_shared(new, guard); | 104 | | | 105 | 4 | loop { | 106 | 4 | // We push onto the tail, so we'll start optimistically by looking there first. | 107 | 4 | let tail = self.tail.load(Acquire, guard); | 108 | 4 | | 109 | 4 | // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. | 110 | 4 | if self.push_internal(tail, new, guard) { | 111 | 4 | break; | 112 | 0 | } | 113 | | } | 114 | 4 | } |
|
115 | | |
116 | | /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop. |
117 | | #[inline(always)] |
118 | 7.24M | fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> { |
119 | 7.24M | let head = self.head.load(Acquire, guard); |
120 | 7.24M | let h = unsafe { head.deref() }; |
121 | 7.24M | let next = h.next.load(Acquire, guard); |
122 | 7.24M | match unsafe { next.as_ref() } { |
123 | 3.20M | Some(n) => unsafe { |
124 | 3.20M | self.head |
125 | 3.20M | .compare_exchange(head, next, Release, Relaxed, guard) |
126 | 3.20M | .map(|_| { |
127 | 2.90M | let tail = self.tail.load(Relaxed, guard); |
128 | 2.90M | // Advance the tail so that we don't retire a pointer to a reachable node. |
129 | 2.90M | if head == tail { |
130 | 281 | let _ = self |
131 | 281 | .tail |
132 | 281 | .compare_exchange(tail, next, Release, Relaxed, guard); |
133 | 2.90M | } |
134 | 2.90M | guard.defer_destroy(head); |
135 | 2.90M | // TODO: Replace with MaybeUninit::read when api is stable |
136 | 2.90M | Some(n.data.as_ptr().read()) |
137 | 3.20M | }) <crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_internal::{closure#0} Line | Count | Source | 126 | 2 | .map(|_| { | 127 | 2 | let tail = self.tail.load(Relaxed, guard); | 128 | 2 | // Advance the tail so that we don't retire a pointer to a reachable node. | 129 | 2 | if head == tail { | 130 | 0 | let _ = self | 131 | 0 | .tail | 132 | 0 | .compare_exchange(tail, next, Release, Relaxed, guard); | 133 | 2 | } | 134 | 2 | guard.defer_destroy(head); | 135 | 2 | // TODO: Replace with MaybeUninit::read when api is stable | 136 | 2 | Some(n.data.as_ptr().read()) | 137 | 2 | }) |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::pop_internal::{closure#0} Line | Count | Source | 126 | 4 | .map(|_| { | 127 | 4 | let tail = self.tail.load(Relaxed, guard); | 128 | 4 | // Advance the tail so that we don't retire a pointer to a reachable node. | 129 | 4 | if head == tail { | 130 | 0 | let _ = self | 131 | 0 | .tail | 132 | 0 | .compare_exchange(tail, next, Release, Relaxed, guard); | 133 | 4 | } | 134 | 4 | guard.defer_destroy(head); | 135 | 4 | // TODO: Replace with MaybeUninit::read when api is stable | 136 | 4 | Some(n.data.as_ptr().read()) | 137 | 4 | }) |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_internal::{closure#0} Line | Count | Source | 126 | 221 | .map(|_| { | 127 | 221 | let tail = self.tail.load(Relaxed, guard); | 128 | 221 | // Advance the tail so that we don't retire a pointer to a reachable node. | 129 | 221 | if head == tail { | 130 | 0 | let _ = self | 131 | 0 | .tail | 132 | 0 | .compare_exchange(tail, next, Release, Relaxed, guard); | 133 | 221 | } | 134 | 221 | guard.defer_destroy(head); | 135 | 221 | // TODO: Replace with MaybeUninit::read when api is stable | 136 | 221 | Some(n.data.as_ptr().read()) | 137 | 221 | }) |
<crossbeam_epoch::sync::queue::Queue<i64>>::pop_internal::{closure#0} Line | Count | Source | 126 | 2.90M | .map(|_| { | 127 | 2.90M | let tail = self.tail.load(Relaxed, guard); | 128 | 2.90M | // Advance the tail so that we don't retire a pointer to a reachable node. | 129 | 2.90M | if head == tail { | 130 | 281 | let _ = self | 131 | 281 | .tail | 132 | 281 | .compare_exchange(tail, next, Release, Relaxed, guard); | 133 | 2.90M | } | 134 | 2.90M | guard.defer_destroy(head); | 135 | 2.90M | // TODO: Replace with MaybeUninit::read when api is stable | 136 | 2.90M | Some(n.data.as_ptr().read()) | 137 | 2.90M | }) |
|
138 | 3.20M | .map_err(|_| ()394k ) |
139 | | }, |
140 | 4.04M | None => Ok(None), |
141 | | } |
142 | 7.24M | } <crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_internal Line | Count | Source | 118 | 4 | fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> { | 119 | 4 | let head = self.head.load(Acquire, guard); | 120 | 4 | let h = unsafe { head.deref() }; | 121 | 4 | let next = h.next.load(Acquire, guard); | 122 | 4 | match unsafe { next.as_ref() } { | 123 | 2 | Some(n) => unsafe { | 124 | 2 | self.head | 125 | 2 | .compare_exchange(head, next, Release, Relaxed, guard) | 126 | 2 | .map(|_| { | 127 | | let tail = self.tail.load(Relaxed, guard); | 128 | | // Advance the tail so that we don't retire a pointer to a reachable node. | 129 | | if head == tail { | 130 | | let _ = self | 131 | | .tail | 132 | | .compare_exchange(tail, next, Release, Relaxed, guard); | 133 | | } | 134 | | guard.defer_destroy(head); | 135 | | // TODO: Replace with MaybeUninit::read when api is stable | 136 | | Some(n.data.as_ptr().read()) | 137 | 2 | }) | 138 | 2 | .map_err(|_| ()) | 139 | | }, | 140 | 2 | None => Ok(None), | 141 | | } | 142 | 4 | } |
<crossbeam_epoch::sync::queue::Queue<i64>>::pop_internal Line | Count | Source | 118 | 5.28M | fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> { | 119 | 5.28M | let head = self.head.load(Acquire, guard); | 120 | 5.28M | let h = unsafe { head.deref() }; | 121 | 5.28M | let next = h.next.load(Acquire, guard); | 122 | 5.28M | match unsafe { next.as_ref() } { | 123 | 3.20M | Some(n) => unsafe { | 124 | 3.20M | self.head | 125 | 3.20M | .compare_exchange(head, next, Release, Relaxed, guard) | 126 | 3.20M | .map(|_| { | 127 | | let tail = self.tail.load(Relaxed, guard); | 128 | | // Advance the tail so that we don't retire a pointer to a reachable node. | 129 | | if head == tail { | 130 | | let _ = self | 131 | | .tail | 132 | | .compare_exchange(tail, next, Release, Relaxed, guard); | 133 | | } | 134 | | guard.defer_destroy(head); | 135 | | // TODO: Replace with MaybeUninit::read when api is stable | 136 | | Some(n.data.as_ptr().read()) | 137 | 3.20M | }) | 138 | 3.20M | .map_err(|_| ()) | 139 | | }, | 140 | 2.08M | None => Ok(None), | 141 | | } | 142 | 5.28M | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::pop_internal Line | Count | Source | 118 | 1.96M | fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> { | 119 | 1.96M | let head = self.head.load(Acquire, guard); | 120 | 1.96M | let h = unsafe { head.deref() }; | 121 | 1.96M | let next = h.next.load(Acquire, guard); | 122 | 1.96M | match unsafe { next.as_ref() } { | 123 | 4 | Some(n) => unsafe { | 124 | 4 | self.head | 125 | 4 | .compare_exchange(head, next, Release, Relaxed, guard) | 126 | 4 | .map(|_| { | 127 | | let tail = self.tail.load(Relaxed, guard); | 128 | | // Advance the tail so that we don't retire a pointer to a reachable node. | 129 | | if head == tail { | 130 | | let _ = self | 131 | | .tail | 132 | | .compare_exchange(tail, next, Release, Relaxed, guard); | 133 | | } | 134 | | guard.defer_destroy(head); | 135 | | // TODO: Replace with MaybeUninit::read when api is stable | 136 | | Some(n.data.as_ptr().read()) | 137 | 4 | }) | 138 | 4 | .map_err(|_| ()) | 139 | | }, | 140 | 1.96M | None => Ok(None), | 141 | | } | 142 | 1.96M | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_internal Line | Count | Source | 118 | 236 | fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> { | 119 | 236 | let head = self.head.load(Acquire, guard); | 120 | 236 | let h = unsafe { head.deref() }; | 121 | 236 | let next = h.next.load(Acquire, guard); | 122 | 236 | match unsafe { next.as_ref() } { | 123 | 221 | Some(n) => unsafe { | 124 | 221 | self.head | 125 | 221 | .compare_exchange(head, next, Release, Relaxed, guard) | 126 | 221 | .map(|_| { | 127 | | let tail = self.tail.load(Relaxed, guard); | 128 | | // Advance the tail so that we don't retire a pointer to a reachable node. | 129 | | if head == tail { | 130 | | let _ = self | 131 | | .tail | 132 | | .compare_exchange(tail, next, Release, Relaxed, guard); | 133 | | } | 134 | | guard.defer_destroy(head); | 135 | | // TODO: Replace with MaybeUninit::read when api is stable | 136 | | Some(n.data.as_ptr().read()) | 137 | 221 | }) | 138 | 221 | .map_err(|_| ()) | 139 | | }, | 140 | 15 | None => Ok(None), | 141 | | } | 142 | 236 | } |
|
143 | | |
144 | | /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue |
145 | | /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop. |
146 | | #[inline(always)] |
147 | 3.74M | fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> |
148 | 3.74M | where |
149 | 3.74M | T: Sync, |
150 | 3.74M | F: Fn(&T) -> bool, |
151 | 3.74M | { |
152 | 3.74M | let head = self.head.load(Acquire, guard); |
153 | 3.74M | let h = unsafe { head.deref() }; |
154 | 3.74M | let next = h.next.load(Acquire, guard); |
155 | 3.74M | match unsafe { next.as_ref() } { |
156 | 162k | Some(n66.1k ) if condition(unsafe { &*n.data.as_ptr() }) => unsafe { |
157 | 66.1k | self.head |
158 | 66.1k | .compare_exchange(head, next, Release, Relaxed, guard) |
159 | 66.1k | .map(|_| { |
160 | 64.8k | let tail = self.tail.load(Relaxed, guard); |
161 | 64.8k | // Advance the tail so that we don't retire a pointer to a reachable node. |
162 | 64.8k | if head == tail { |
163 | 0 | let _ = self |
164 | 0 | .tail |
165 | 0 | .compare_exchange(tail, next, Release, Relaxed, guard); |
166 | 64.8k | } |
167 | 64.8k | guard.defer_destroy(head); |
168 | 64.8k | Some(n.data.as_ptr().read()) |
169 | 66.1k | }) <crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_if_internal::<&&<crossbeam_epoch::internal::Global>::collect::{closure#0}>::{closure#0} Line | Count | Source | 159 | 165 | .map(|_| { | 160 | 165 | let tail = self.tail.load(Relaxed, guard); | 161 | 165 | // Advance the tail so that we don't retire a pointer to a reachable node. | 162 | 165 | if head == tail { | 163 | 0 | let _ = self | 164 | 0 | .tail | 165 | 0 | .compare_exchange(tail, next, Release, Relaxed, guard); | 166 | 165 | } | 167 | 165 | guard.defer_destroy(head); | 168 | 165 | Some(n.data.as_ptr().read()) | 169 | 165 | }) |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_if_internal::<&&<crossbeam_epoch::internal::Global>::collect::{closure#0}>::{closure#0} Line | Count | Source | 159 | 64.7k | .map(|_| { | 160 | 64.7k | let tail = self.tail.load(Relaxed, guard); | 161 | 64.7k | // Advance the tail so that we don't retire a pointer to a reachable node. | 162 | 64.7k | if head == tail { | 163 | 0 | let _ = self | 164 | 0 | .tail | 165 | 0 | .compare_exchange(tail, next, Release, Relaxed, guard); | 166 | 64.7k | } | 167 | 64.7k | guard.defer_destroy(head); | 168 | 64.7k | Some(n.data.as_ptr().read()) | 169 | 64.7k | }) |
|
170 | 66.1k | .map_err(|_| ()1.37k ) <crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_if_internal::<&&<crossbeam_epoch::internal::Global>::collect::{closure#0}>::{closure#1} Line | Count | Source | 170 | 10 | .map_err(|_| ()) |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_if_internal::<&&<crossbeam_epoch::internal::Global>::collect::{closure#0}>::{closure#1} Line | Count | Source | 170 | 1.36k | .map_err(|_| ()) |
|
171 | | }, |
172 | 3.67M | None | Some(_) => Ok(None), |
173 | | } |
174 | 3.74M | } <crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_if_internal::<&&<crossbeam_epoch::internal::Global>::collect::{closure#0}> Line | Count | Source | 147 | 9.88k | fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> | 148 | 9.88k | where | 149 | 9.88k | T: Sync, | 150 | 9.88k | F: Fn(&T) -> bool, | 151 | 9.88k | { | 152 | 9.88k | let head = self.head.load(Acquire, guard); | 153 | 9.88k | let h = unsafe { head.deref() }; | 154 | 9.88k | let next = h.next.load(Acquire, guard); | 155 | 9.88k | match unsafe { next.as_ref() } { | 156 | 6.15k | Some(n175 ) if condition(unsafe { &*n.data.as_ptr() }) => unsafe { | 157 | 175 | self.head | 158 | 175 | .compare_exchange(head, next, Release, Relaxed, guard) | 159 | 175 | .map(|_| { | 160 | | let tail = self.tail.load(Relaxed, guard); | 161 | | // Advance the tail so that we don't retire a pointer to a reachable node. | 162 | | if head == tail { | 163 | | let _ = self | 164 | | .tail | 165 | | .compare_exchange(tail, next, Release, Relaxed, guard); | 166 | | } | 167 | | guard.defer_destroy(head); | 168 | | Some(n.data.as_ptr().read()) | 169 | 175 | }) | 170 | 175 | .map_err(|_| ()) | 171 | | }, | 172 | 9.70k | None | Some(_) => Ok(None), | 173 | | } | 174 | 9.87k | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_if_internal::<&&<crossbeam_epoch::internal::Global>::collect::{closure#0}> Line | Count | Source | 147 | 3.73M | fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> | 148 | 3.73M | where | 149 | 3.73M | T: Sync, | 150 | 3.73M | F: Fn(&T) -> bool, | 151 | 3.73M | { | 152 | 3.73M | let head = self.head.load(Acquire, guard); | 153 | 3.73M | let h = unsafe { head.deref() }; | 154 | 3.73M | let next = h.next.load(Acquire, guard); | 155 | 3.73M | match unsafe { next.as_ref() } { | 156 | 156k | Some(n65.9k ) if condition(unsafe { &*n.data.as_ptr() }) => unsafe { | 157 | 65.9k | self.head | 158 | 65.9k | .compare_exchange(head, next, Release, Relaxed, guard) | 159 | 65.9k | .map(|_| { | 160 | | let tail = self.tail.load(Relaxed, guard); | 161 | | // Advance the tail so that we don't retire a pointer to a reachable node. | 162 | | if head == tail { | 163 | | let _ = self | 164 | | .tail | 165 | | .compare_exchange(tail, next, Release, Relaxed, guard); | 166 | | } | 167 | | guard.defer_destroy(head); | 168 | | Some(n.data.as_ptr().read()) | 169 | 65.9k | }) | 170 | 65.9k | .map_err(|_| ()) | 171 | | }, | 172 | 3.66M | None | Some(_) => Ok(None), | 173 | | } | 174 | 3.73M | } |
|
175 | | |
176 | | /// Attempts to dequeue from the front. |
177 | | /// |
178 | | /// Returns `None` if the queue is observed to be empty. |
179 | 6.81M | pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> { |
180 | | loop { |
181 | 7.21M | if let Ok(head6.81M ) = self.pop_internal(guard) { |
182 | 6.81M | return head; |
183 | 394k | } |
184 | | } |
185 | 6.81M | } <crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::try_pop Line | Count | Source | 179 | 4 | pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> { | 180 | | loop { | 181 | 4 | if let Ok(head) = self.pop_internal(guard) { | 182 | 4 | return head; | 183 | 0 | } | 184 | | } | 185 | 4 | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::try_pop Line | Count | Source | 179 | 236 | pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> { | 180 | | loop { | 181 | 236 | if let Ok(head) = self.pop_internal(guard) { | 182 | 236 | return head; | 183 | 0 | } | 184 | | } | 185 | 236 | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::try_pop Line | Count | Source | 179 | 1.95M | pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> { | 180 | | loop { | 181 | 1.95M | if let Ok(head) = self.pop_internal(guard) { | 182 | 1.95M | return head; | 183 | 0 | } | 184 | | } | 185 | 1.95M | } |
<crossbeam_epoch::sync::queue::Queue<i64>>::try_pop Line | Count | Source | 179 | 4.86M | pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> { | 180 | | loop { | 181 | 5.25M | if let Ok(head4.86M ) = self.pop_internal(guard) { | 182 | 4.86M | return head; | 183 | 394k | } | 184 | | } | 185 | 4.86M | } |
|
186 | | |
187 | | /// Attempts to dequeue from the front, if the item satisfies the given condition. |
188 | | /// |
189 | | /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given |
190 | | /// condition. |
191 | 3.73M | pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> |
192 | 3.73M | where |
193 | 3.73M | T: Sync, |
194 | 3.73M | F: Fn(&T) -> bool, |
195 | 3.73M | { |
196 | | loop { |
197 | 3.73M | if let Ok(head3.73M ) = self.pop_if_internal(&condition, guard) { |
198 | 3.73M | return head; |
199 | 1.37k | } |
200 | | } |
201 | 3.73M | } <crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::try_pop_if::<&<crossbeam_epoch::internal::Global>::collect::{closure#0}> Line | Count | Source | 191 | 9.87k | pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> | 192 | 9.87k | where | 193 | 9.87k | T: Sync, | 194 | 9.87k | F: Fn(&T) -> bool, | 195 | 9.87k | { | 196 | | loop { | 197 | 9.88k | if let Ok(head9.87k ) = self.pop_if_internal(&condition, guard) { | 198 | 9.87k | return head; | 199 | 10 | } | 200 | | } | 201 | 9.87k | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::try_pop_if::<&<crossbeam_epoch::internal::Global>::collect::{closure#0}> Line | Count | Source | 191 | 3.72M | pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> | 192 | 3.72M | where | 193 | 3.72M | T: Sync, | 194 | 3.72M | F: Fn(&T) -> bool, | 195 | 3.72M | { | 196 | | loop { | 197 | 3.72M | if let Ok(head3.72M ) = self.pop_if_internal(&condition, guard) { | 198 | 3.72M | return head; | 199 | 1.36k | } | 200 | | } | 201 | 3.72M | } |
|
202 | | } |
203 | | |
204 | | impl<T> Drop for Queue<T> { |
205 | 28 | fn drop(&mut self) { |
206 | | unsafe { |
207 | 28 | let guard = unprotected(); |
208 | | |
209 | 161k | while self.try_pop(guard).is_some() {}161k |
210 | | |
211 | | // Destroy the remaining sentinel node. |
212 | 28 | let sentinel = self.head.load(Relaxed, guard); |
213 | 28 | drop(sentinel.into_owned()); |
214 | 28 | } |
215 | 28 | } <crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag> as core::ops::drop::Drop>::drop Line | Count | Source | 205 | 2 | fn drop(&mut self) { | 206 | | unsafe { | 207 | 2 | let guard = unprotected(); | 208 | | | 209 | 4 | while self.try_pop(guard).is_some() {}2 | 210 | | | 211 | | // Destroy the remaining sentinel node. | 212 | 2 | let sentinel = self.head.load(Relaxed, guard); | 213 | 2 | drop(sentinel.into_owned()); | 214 | 2 | } | 215 | 2 | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag> as core::ops::drop::Drop>::drop Line | Count | Source | 205 | 15 | fn drop(&mut self) { | 206 | | unsafe { | 207 | 15 | let guard = unprotected(); | 208 | | | 209 | 236 | while self.try_pop(guard).is_some() {}221 | 210 | | | 211 | | // Destroy the remaining sentinel node. | 212 | 15 | let sentinel = self.head.load(Relaxed, guard); | 213 | 15 | drop(sentinel.into_owned()); | 214 | 15 | } | 215 | 15 | } |
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR> as core::ops::drop::Drop>::drop Line | Count | Source | 205 | 1 | fn drop(&mut self) { | 206 | | unsafe { | 207 | 1 | let guard = unprotected(); | 208 | | | 209 | 1 | while self.try_pop(guard).is_some() {}0 | 210 | | | 211 | | // Destroy the remaining sentinel node. | 212 | 1 | let sentinel = self.head.load(Relaxed, guard); | 213 | 1 | drop(sentinel.into_owned()); | 214 | 1 | } | 215 | 1 | } |
<crossbeam_epoch::sync::queue::Queue<i64> as core::ops::drop::Drop>::drop Line | Count | Source | 205 | 10 | fn drop(&mut self) { | 206 | | unsafe { | 207 | 10 | let guard = unprotected(); | 208 | | | 209 | 161k | while self.try_pop(guard).is_some() {}161k | 210 | | | 211 | | // Destroy the remaining sentinel node. | 212 | 10 | let sentinel = self.head.load(Relaxed, guard); | 213 | 10 | drop(sentinel.into_owned()); | 214 | 10 | } | 215 | 10 | } |
|
216 | | } |
217 | | |
218 | | #[cfg(all(test, not(loom_crossbeam)))] |
219 | | mod test { |
220 | | use super::*; |
221 | | use crate::pin; |
222 | | use crossbeam_utils::thread; |
223 | | |
224 | | struct Queue<T> { |
225 | | queue: super::Queue<T>, |
226 | | } |
227 | | |
228 | | impl<T> Queue<T> { |
229 | 11 | pub(crate) fn new() -> Queue<T> { |
230 | 11 | Queue { |
231 | 11 | queue: super::Queue::new(), |
232 | 11 | } |
233 | 11 | } <crossbeam_epoch::sync::queue::test::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::new Line | Count | Source | 229 | 1 | pub(crate) fn new() -> Queue<T> { | 230 | 1 | Queue { | 231 | 1 | queue: super::Queue::new(), | 232 | 1 | } | 233 | 1 | } |
<crossbeam_epoch::sync::queue::test::Queue<i64>>::new Line | Count | Source | 229 | 10 | pub(crate) fn new() -> Queue<T> { | 230 | 10 | Queue { | 231 | 10 | queue: super::Queue::new(), | 232 | 10 | } | 233 | 10 | } |
|
234 | | |
235 | 2.85M | pub(crate) fn push(&self, t: T) { |
236 | 2.85M | let guard = &pin(); |
237 | 2.85M | self.queue.push(t, guard); |
238 | 2.85M | } <crossbeam_epoch::sync::queue::test::Queue<i64>>::push Line | Count | Source | 235 | 2.85M | pub(crate) fn push(&self, t: T) { | 236 | 2.85M | let guard = &pin(); | 237 | 2.85M | self.queue.push(t, guard); | 238 | 2.85M | } |
<crossbeam_epoch::sync::queue::test::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::push Line | Count | Source | 235 | 4 | pub(crate) fn push(&self, t: T) { | 236 | 4 | let guard = &pin(); | 237 | 4 | self.queue.push(t, guard); | 238 | 4 | } |
|
239 | | |
240 | 21 | pub(crate) fn is_empty(&self) -> bool { |
241 | 21 | let guard = &pin(); |
242 | 21 | let head = self.queue.head.load(Acquire, guard); |
243 | 21 | let h = unsafe { head.deref() }; |
244 | 21 | h.next.load(Acquire, guard).is_null() |
245 | 21 | } <crossbeam_epoch::sync::queue::test::Queue<i64>>::is_empty Line | Count | Source | 240 | 20 | pub(crate) fn is_empty(&self) -> bool { | 241 | 20 | let guard = &pin(); | 242 | 20 | let head = self.queue.head.load(Acquire, guard); | 243 | 20 | let h = unsafe { head.deref() }; | 244 | 20 | h.next.load(Acquire, guard).is_null() | 245 | 20 | } |
<crossbeam_epoch::sync::queue::test::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::is_empty Line | Count | Source | 240 | 1 | pub(crate) fn is_empty(&self) -> bool { | 241 | 1 | let guard = &pin(); | 242 | 1 | let head = self.queue.head.load(Acquire, guard); | 243 | 1 | let h = unsafe { head.deref() }; | 244 | 1 | h.next.load(Acquire, guard).is_null() | 245 | 1 | } |
|
246 | | |
247 | 6.71M | pub(crate) fn try_pop(&self) -> Option<T> { |
248 | 6.71M | let guard = &pin(); |
249 | 6.71M | self.queue.try_pop(guard) |
250 | 6.71M | } <crossbeam_epoch::sync::queue::test::Queue<i64>>::try_pop Line | Count | Source | 247 | 4.74M | pub(crate) fn try_pop(&self) -> Option<T> { | 248 | 4.74M | let guard = &pin(); | 249 | 4.74M | self.queue.try_pop(guard) | 250 | 4.74M | } |
<crossbeam_epoch::sync::queue::test::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::try_pop Line | Count | Source | 247 | 1.96M | pub(crate) fn try_pop(&self) -> Option<T> { | 248 | 1.96M | let guard = &pin(); | 249 | 1.96M | self.queue.try_pop(guard) | 250 | 1.96M | } |
|
251 | | |
252 | 1.00M | pub(crate) fn pop(&self) -> T { |
253 | 1.01M | loop { |
254 | 1.01M | match self.try_pop() { |
255 | 1.01M | None => continue, |
256 | 1.00M | Some(t) => return t, |
257 | 1.00M | } |
258 | 1.00M | } |
259 | 1.00M | } |
260 | | } |
261 | | |
262 | | const CONC_COUNT: i64 = 1000000; |
263 | | |
264 | | #[test] |
265 | 1 | fn push_try_pop_1() { crossbeam_epoch::sync::queue::test::push_try_pop_1::{closure#0} Line | Count | Source | 265 | 1 | fn push_try_pop_1() { |
|
266 | 1 | let q: Queue<i64> = Queue::new(); |
267 | 1 | assert!(q.is_empty()); |
268 | 1 | q.push(37); |
269 | 1 | assert!(!q.is_empty()); |
270 | 1 | assert_eq!(q.try_pop(), Some(37)); |
271 | 1 | assert!(q.is_empty()); |
272 | 1 | } crossbeam_epoch::sync::queue::test::push_try_pop_1 Line | Count | Source | 265 | 1 | fn push_try_pop_1() { | 266 | 1 | let q: Queue<i64> = Queue::new(); | 267 | 1 | assert!(q.is_empty()); | 268 | 1 | q.push(37); | 269 | 1 | assert!(!q.is_empty()); | 270 | 1 | assert_eq!(q.try_pop(), Some(37)); | 271 | 1 | assert!(q.is_empty()); | 272 | 1 | } |
|
273 | | |
274 | | #[test] |
275 | 1 | fn push_try_pop_2() { crossbeam_epoch::sync::queue::test::push_try_pop_2::{closure#0} Line | Count | Source | 275 | 1 | fn push_try_pop_2() { |
|
276 | 1 | let q: Queue<i64> = Queue::new(); |
277 | 1 | assert!(q.is_empty()); |
278 | 1 | q.push(37); |
279 | 1 | q.push(48); |
280 | 1 | assert_eq!(q.try_pop(), Some(37)); |
281 | 1 | assert!(!q.is_empty()); |
282 | 1 | assert_eq!(q.try_pop(), Some(48)); |
283 | 1 | assert!(q.is_empty()); |
284 | 1 | } crossbeam_epoch::sync::queue::test::push_try_pop_2 Line | Count | Source | 275 | 1 | fn push_try_pop_2() { | 276 | 1 | let q: Queue<i64> = Queue::new(); | 277 | 1 | assert!(q.is_empty()); | 278 | 1 | q.push(37); | 279 | 1 | q.push(48); | 280 | 1 | assert_eq!(q.try_pop(), Some(37)); | 281 | 1 | assert!(!q.is_empty()); | 282 | 1 | assert_eq!(q.try_pop(), Some(48)); | 283 | 1 | assert!(q.is_empty()); | 284 | 1 | } |
|
285 | | |
286 | | #[test] |
287 | 1 | fn push_try_pop_many_seq() { crossbeam_epoch::sync::queue::test::push_try_pop_many_seq::{closure#0} Line | Count | Source | 287 | 1 | fn push_try_pop_many_seq() { |
|
288 | 1 | let q: Queue<i64> = Queue::new(); |
289 | 1 | assert!(q.is_empty()); |
290 | 200 | for i in 0..200 { |
291 | 200 | q.push(i) |
292 | | } |
293 | 1 | assert!(!q.is_empty()); |
294 | 200 | for i in 0..200 { |
295 | 200 | assert_eq!(q.try_pop(), Some(i)); |
296 | | } |
297 | 1 | assert!(q.is_empty()); |
298 | 1 | } crossbeam_epoch::sync::queue::test::push_try_pop_many_seq Line | Count | Source | 287 | 1 | fn push_try_pop_many_seq() { | 288 | 1 | let q: Queue<i64> = Queue::new(); | 289 | 1 | assert!(q.is_empty()); | 290 | 200 | for i in 0..200 { | 291 | 200 | q.push(i) | 292 | | } | 293 | 1 | assert!(!q.is_empty()); | 294 | 200 | for i in 0..200 { | 295 | 200 | assert_eq!(q.try_pop(), Some(i)); | 296 | | } | 297 | 1 | assert!(q.is_empty()); | 298 | 1 | } |
|
299 | | |
300 | | #[test] |
301 | 1 | fn push_pop_1() { crossbeam_epoch::sync::queue::test::push_pop_1::{closure#0} Line | Count | Source | 301 | 1 | fn push_pop_1() { |
|
302 | 1 | let q: Queue<i64> = Queue::new(); |
303 | 1 | assert!(q.is_empty()); |
304 | 1 | q.push(37); |
305 | 1 | assert!(!q.is_empty()); |
306 | 1 | assert_eq!(q.pop(), 37); |
307 | 1 | assert!(q.is_empty()); |
308 | 1 | } crossbeam_epoch::sync::queue::test::push_pop_1 Line | Count | Source | 301 | 1 | fn push_pop_1() { | 302 | 1 | let q: Queue<i64> = Queue::new(); | 303 | 1 | assert!(q.is_empty()); | 304 | 1 | q.push(37); | 305 | 1 | assert!(!q.is_empty()); | 306 | 1 | assert_eq!(q.pop(), 37); | 307 | 1 | assert!(q.is_empty()); | 308 | 1 | } |
|
309 | | |
310 | | #[test] |
311 | 1 | fn push_pop_2() { crossbeam_epoch::sync::queue::test::push_pop_2::{closure#0} Line | Count | Source | 311 | 1 | fn push_pop_2() { |
|
312 | 1 | let q: Queue<i64> = Queue::new(); |
313 | 1 | q.push(37); |
314 | 1 | q.push(48); |
315 | 1 | assert_eq!(q.pop(), 37); |
316 | 1 | assert_eq!(q.pop(), 48); |
317 | 1 | } crossbeam_epoch::sync::queue::test::push_pop_2 Line | Count | Source | 311 | 1 | fn push_pop_2() { | 312 | 1 | let q: Queue<i64> = Queue::new(); | 313 | 1 | q.push(37); | 314 | 1 | q.push(48); | 315 | 1 | assert_eq!(q.pop(), 37); | 316 | 1 | assert_eq!(q.pop(), 48); | 317 | 1 | } |
|
318 | | |
319 | | #[test] |
320 | 1 | fn push_pop_many_seq() { crossbeam_epoch::sync::queue::test::push_pop_many_seq::{closure#0} Line | Count | Source | 320 | 1 | fn push_pop_many_seq() { |
|
321 | 1 | let q: Queue<i64> = Queue::new(); |
322 | 1 | assert!(q.is_empty()); |
323 | 200 | for i in 0..200 { |
324 | 200 | q.push(i) |
325 | | } |
326 | 1 | assert!(!q.is_empty()); |
327 | 200 | for i in 0..200 { |
328 | 200 | assert_eq!(q.pop(), i); |
329 | | } |
330 | 1 | assert!(q.is_empty()); |
331 | 1 | } crossbeam_epoch::sync::queue::test::push_pop_many_seq Line | Count | Source | 320 | 1 | fn push_pop_many_seq() { | 321 | 1 | let q: Queue<i64> = Queue::new(); | 322 | 1 | assert!(q.is_empty()); | 323 | 200 | for i in 0..200 { | 324 | 200 | q.push(i) | 325 | | } | 326 | 1 | assert!(!q.is_empty()); | 327 | 200 | for i in 0..200 { | 328 | 200 | assert_eq!(q.pop(), i); | 329 | | } | 330 | 1 | assert!(q.is_empty()); | 331 | 1 | } |
|
332 | | |
333 | | #[test] |
334 | 1 | fn push_try_pop_many_spsc() { crossbeam_epoch::sync::queue::test::push_try_pop_many_spsc::{closure#0} Line | Count | Source | 334 | 1 | fn push_try_pop_many_spsc() { |
|
335 | 1 | let q: Queue<i64> = Queue::new(); |
336 | 1 | assert!(q.is_empty()); |
337 | | |
338 | 1 | thread::scope(|scope| { |
339 | 1 | scope.spawn(|_| { |
340 | 1 | let mut next = 0; |
341 | | |
342 | 1.02M | while next < CONC_COUNT { |
343 | 1.02M | if let Some(elem1.00M ) = q.try_pop() { |
344 | 0 | assert_eq!(elem, next); |
345 | 1.00M | next += 1; |
346 | 20.4k | } |
347 | | } |
348 | 1 | }); |
349 | | |
350 | 1.00M | for i in 0..CONC_COUNT { |
351 | 1.00M | q.push(i) |
352 | | } |
353 | 1 | }) |
354 | 1 | .unwrap(); |
355 | 1 | } crossbeam_epoch::sync::queue::test::push_try_pop_many_spsc Line | Count | Source | 334 | 1 | fn push_try_pop_many_spsc() { | 335 | 1 | let q: Queue<i64> = Queue::new(); | 336 | 1 | assert!(q.is_empty()); | 337 | | | 338 | 1 | thread::scope(|scope| { | 339 | | scope.spawn(|_| { | 340 | | let mut next = 0; | 341 | | | 342 | | while next < CONC_COUNT { | 343 | | if let Some(elem) = q.try_pop() { | 344 | | assert_eq!(elem, next); | 345 | | next += 1; | 346 | | } | 347 | | } | 348 | | }); | 349 | | | 350 | | for i in 0..CONC_COUNT { | 351 | | q.push(i) | 352 | | } | 353 | 1 | }) | 354 | 1 | .unwrap(); | 355 | 1 | } |
|
356 | | |
357 | | #[test] |
358 | 1 | fn push_try_pop_many_spmc() { crossbeam_epoch::sync::queue::test::push_try_pop_many_spmc::{closure#0} Line | Count | Source | 358 | 1 | fn push_try_pop_many_spmc() { |
|
359 | 3 | fn recv(_t: i32, q: &Queue<i64>) { |
360 | 3 | let mut cur = -1; |
361 | 2.92M | for _i in 0..CONC_COUNT { |
362 | 2.92M | if let Some(elem854k ) = q.try_pop() { |
363 | 854k | assert!(elem > cur); |
364 | 854k | cur = elem; |
365 | 854k | |
366 | 854k | if cur == CONC_COUNT - 1 { |
367 | 0 | break; |
368 | 854k | } |
369 | 2.06M | } |
370 | | } |
371 | 3 | } |
372 | | |
373 | 1 | let q: Queue<i64> = Queue::new(); |
374 | 1 | assert!(q.is_empty()); |
375 | 1 | thread::scope(|scope| { |
376 | 3 | for i in 0..3 { |
377 | 3 | let q = &q; |
378 | 3 | scope.spawn(move |_| recv(i, q)); |
379 | 3 | } |
380 | | |
381 | 1 | scope.spawn(|_| { |
382 | 1.00M | for i in 0..CONC_COUNT { |
383 | 1.00M | q.push(i); |
384 | 1.00M | } |
385 | 1 | }); |
386 | 1 | }) |
387 | 1 | .unwrap(); |
388 | 1 | } crossbeam_epoch::sync::queue::test::push_try_pop_many_spmc Line | Count | Source | 358 | 1 | fn push_try_pop_many_spmc() { | 359 | | fn recv(_t: i32, q: &Queue<i64>) { | 360 | | let mut cur = -1; | 361 | | for _i in 0..CONC_COUNT { | 362 | | if let Some(elem) = q.try_pop() { | 363 | | assert!(elem > cur); | 364 | | cur = elem; | 365 | | | 366 | | if cur == CONC_COUNT - 1 { | 367 | | break; | 368 | | } | 369 | | } | 370 | | } | 371 | | } | 372 | | | 373 | 1 | let q: Queue<i64> = Queue::new(); | 374 | 1 | assert!(q.is_empty()); | 375 | 1 | thread::scope(|scope| { | 376 | | for i in 0..3 { | 377 | | let q = &q; | 378 | | scope.spawn(move |_| recv(i, q)); | 379 | | } | 380 | | | 381 | | scope.spawn(|_| { | 382 | | for i in 0..CONC_COUNT { | 383 | | q.push(i); | 384 | | } | 385 | | }); | 386 | 1 | }) | 387 | 1 | .unwrap(); | 388 | 1 | } |
|
389 | | |
390 | | #[test] |
391 | 1 | fn push_try_pop_many_mpmc() { crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::{closure#0} Line | Count | Source | 391 | 1 | fn push_try_pop_many_mpmc() { |
|
392 | | enum LR { |
393 | | Left(i64), |
394 | | Right(i64), |
395 | | } |
396 | | |
397 | 1 | let q: Queue<LR> = Queue::new(); |
398 | 1 | assert!(q.is_empty()); |
399 | | |
400 | 1 | thread::scope(|scope| { |
401 | 2 | for _t in 0..2 { |
402 | 2 | scope.spawn(|_| { |
403 | 2 | for i in CONC_COUNT - 1..CONC_COUNT { |
404 | 2 | q.push(LR::Left(i)) |
405 | | } |
406 | 2 | }); |
407 | 2 | scope.spawn(|_| { |
408 | 2 | for i in CONC_COUNT - 1..CONC_COUNT { |
409 | 2 | q.push(LR::Right(i)) |
410 | | } |
411 | 2 | }); |
412 | 2 | scope.spawn(|_| { |
413 | 2 | let mut vl = vec![]; |
414 | 2 | let mut vr = vec![]; |
415 | 1.97M | for _i in 0..CONC_COUNT { |
416 | 1.97M | match q.try_pop() { |
417 | 2 | Some(LR::Left(x)) => vl.push(x), |
418 | 2 | Some(LR::Right(x)) => vr.push(x), |
419 | 1.96M | _ => {} |
420 | | } |
421 | | } |
422 | | |
423 | 2 | let mut vl2 = vl.clone(); |
424 | 2 | let mut vr2 = vr.clone(); |
425 | 2 | vl2.sort(); |
426 | 2 | vr2.sort(); |
427 | | |
428 | 2 | assert_eq!(vl, vl2); |
429 | 0 | assert_eq!(vr, vr2); |
430 | 2 | }); |
431 | 2 | } |
432 | 1 | }) |
433 | 1 | .unwrap(); |
434 | 1 | } crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc Line | Count | Source | 391 | 1 | fn push_try_pop_many_mpmc() { | 392 | | enum LR { | 393 | | Left(i64), | 394 | | Right(i64), | 395 | | } | 396 | | | 397 | 1 | let q: Queue<LR> = Queue::new(); | 398 | 1 | assert!(q.is_empty()); | 399 | | | 400 | 1 | thread::scope(|scope| { | 401 | | for _t in 0..2 { | 402 | | scope.spawn(|_| { | 403 | | for i in CONC_COUNT - 1..CONC_COUNT { | 404 | | q.push(LR::Left(i)) | 405 | | } | 406 | | }); | 407 | | scope.spawn(|_| { | 408 | | for i in CONC_COUNT - 1..CONC_COUNT { | 409 | | q.push(LR::Right(i)) | 410 | | } | 411 | | }); | 412 | | scope.spawn(|_| { | 413 | | let mut vl = vec![]; | 414 | | let mut vr = vec![]; | 415 | | for _i in 0..CONC_COUNT { | 416 | | match q.try_pop() { | 417 | | Some(LR::Left(x)) => vl.push(x), | 418 | | Some(LR::Right(x)) => vr.push(x), | 419 | | _ => {} | 420 | | } | 421 | | } | 422 | | | 423 | | let mut vl2 = vl.clone(); | 424 | | let mut vr2 = vr.clone(); | 425 | | vl2.sort(); | 426 | | vr2.sort(); | 427 | | | 428 | | assert_eq!(vl, vl2); | 429 | | assert_eq!(vr, vr2); | 430 | | }); | 431 | | } | 432 | 1 | }) | 433 | 1 | .unwrap(); | 434 | 1 | } |
|
435 | | |
436 | | #[test] |
437 | 1 | fn push_pop_many_spsc() { |
438 | 1 | let q: Queue<i64> = Queue::new(); |
439 | 1 | |
440 | 1 | thread::scope(|scope| { |
441 | 1 | scope.spawn(|_| { |
442 | 1 | let mut next = 0; |
443 | 1.00M | while next < CONC_COUNT { |
444 | 1.00M | assert_eq!(q.pop(), next); |
445 | 1.00M | next += 1; |
446 | | } |
447 | 1 | }); |
448 | | |
449 | 1.00M | for i in 0..CONC_COUNT { |
450 | 1.00M | q.push(i) |
451 | | } |
452 | 1 | }) |
453 | 1 | .unwrap(); |
454 | 1 | assert!(q.is_empty()); |
455 | 1 | } |
456 | | |
457 | | #[test] |
458 | 1 | fn is_empty_dont_pop() { crossbeam_epoch::sync::queue::test::is_empty_dont_pop::{closure#0} Line | Count | Source | 458 | 1 | fn is_empty_dont_pop() { |
|
459 | 1 | let q: Queue<i64> = Queue::new(); |
460 | 1 | q.push(20); |
461 | 1 | q.push(20); |
462 | 1 | assert!(!q.is_empty()); |
463 | 1 | assert!(!q.is_empty()); |
464 | 1 | assert!(q.try_pop().is_some()); |
465 | 1 | } crossbeam_epoch::sync::queue::test::is_empty_dont_pop Line | Count | Source | 458 | 1 | fn is_empty_dont_pop() { | 459 | 1 | let q: Queue<i64> = Queue::new(); | 460 | 1 | q.push(20); | 461 | 1 | q.push(20); | 462 | 1 | assert!(!q.is_empty()); | 463 | 1 | assert!(!q.is_empty()); | 464 | 1 | assert!(q.try_pop().is_some()); | 465 | 1 | } |
|
466 | | } |