crossbeam-queue/src/array_queue.rs
Line | Count | Source |
1 | | //! The implementation is based on Dmitry Vyukov's bounded MPMC queue. |
2 | | //! |
3 | | //! Source: |
4 | | //! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue> |
5 | | |
6 | | use alloc::boxed::Box; |
7 | | use core::cell::UnsafeCell; |
8 | | use core::fmt; |
9 | | use core::marker::PhantomData; |
10 | | use core::mem::{self, MaybeUninit}; |
11 | | use core::sync::atomic::{self, AtomicUsize, Ordering}; |
12 | | |
13 | | use crossbeam_utils::{Backoff, CachePadded}; |
14 | | |
15 | | /// A slot in a queue. |
16 | | struct Slot<T> { |
17 | | /// The current stamp. |
18 | | /// |
19 | | /// If the stamp equals the tail, this node will be next written to. If it equals head + 1, |
20 | | /// this node will be next read from. |
21 | | stamp: AtomicUsize, |
22 | | |
23 | | /// The value in this slot. |
24 | | value: UnsafeCell<MaybeUninit<T>>, |
25 | | } |
26 | | |
27 | | /// A bounded multi-producer multi-consumer queue. |
28 | | /// |
29 | | /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed |
30 | | /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an |
31 | | /// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit |
32 | | /// faster than [`SegQueue`]. |
33 | | /// |
34 | | /// [`SegQueue`]: super::SegQueue |
35 | | /// |
36 | | /// # Examples |
37 | | /// |
38 | | /// ``` |
39 | | /// use crossbeam_queue::ArrayQueue; |
40 | | /// |
41 | | /// let q = ArrayQueue::new(2); |
42 | | /// |
43 | | /// assert_eq!(q.push('a'), Ok(())); |
44 | | /// assert_eq!(q.push('b'), Ok(())); |
45 | | /// assert_eq!(q.push('c'), Err('c')); |
46 | | /// assert_eq!(q.pop(), Some('a')); |
47 | | /// ``` |
48 | | pub struct ArrayQueue<T> { |
49 | | /// The head of the queue. |
50 | | /// |
51 | | /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a |
52 | | /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. |
53 | | /// |
54 | | /// Elements are popped from the head of the queue. |
55 | | head: CachePadded<AtomicUsize>, |
56 | | |
57 | | /// The tail of the queue. |
58 | | /// |
59 | | /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a |
60 | | /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. |
61 | | /// |
62 | | /// Elements are pushed into the tail of the queue. |
63 | | tail: CachePadded<AtomicUsize>, |
64 | | |
65 | | /// The buffer holding slots. |
66 | | buffer: *mut Slot<T>, |
67 | | |
68 | | /// The queue capacity. |
69 | | cap: usize, |
70 | | |
71 | | /// A stamp with the value of `{ lap: 1, index: 0 }`. |
72 | | one_lap: usize, |
73 | | |
74 | | /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`. |
75 | | _marker: PhantomData<T>, |
76 | | } |
77 | | |
78 | | unsafe impl<T: Send> Sync for ArrayQueue<T> {} |
79 | | unsafe impl<T: Send> Send for ArrayQueue<T> {} |
80 | | |
81 | | impl<T> ArrayQueue<T> { |
82 | | /// Creates a new bounded queue with the given capacity. |
83 | | /// |
84 | | /// # Panics |
85 | | /// |
86 | | /// Panics if the capacity is zero. |
87 | | /// |
88 | | /// # Examples |
89 | | /// |
90 | | /// ``` |
91 | | /// use crossbeam_queue::ArrayQueue; |
92 | | /// |
93 | | /// let q = ArrayQueue::<i32>::new(100); |
94 | | /// ``` |
95 | 117 | pub fn new(cap: usize) -> ArrayQueue<T> { |
96 | 117 | assert!(cap > 0, "capacity must be non-zero"); |
97 | | |
98 | | // Head is initialized to `{ lap: 0, index: 0 }`. |
99 | | // Tail is initialized to `{ lap: 0, index: 0 }`. |
100 | 116 | let head = 0; |
101 | 116 | let tail = 0; |
102 | 116 | |
103 | 116 | // Allocate a buffer of `cap` slots initialized |
104 | 116 | // with stamps. |
105 | 116 | let buffer = { |
106 | 116 | let mut boxed: Box<[Slot<T>]> = (0..cap) |
107 | 6.06k | .map(|i| { |
108 | 6.06k | // Set the stamp to `{ lap: 0, index: i }`. |
109 | 6.06k | Slot { |
110 | 6.06k | stamp: AtomicUsize::new(i), |
111 | 6.06k | value: UnsafeCell::new(MaybeUninit::uninit()), |
112 | 6.06k | } |
113 | 6.06k | }) <crossbeam_queue::array_queue::ArrayQueue<array_queue::drops::DropCounter>>::new::{closure#0} Line | Count | Source | 107 | 5.00k | .map(|i| { | 108 | 5.00k | // Set the stamp to `{ lap: 0, index: i }`. | 109 | 5.00k | Slot { | 110 | 5.00k | stamp: AtomicUsize::new(i), | 111 | 5.00k | value: UnsafeCell::new(MaybeUninit::uninit()), | 112 | 5.00k | } | 113 | 5.00k | }) |
<crossbeam_queue::array_queue::ArrayQueue<usize>>::new::{closure#0} Line | Count | Source | 107 | 1.00k | .map(|i| { | 108 | 1.00k | // Set the stamp to `{ lap: 0, index: i }`. | 109 | 1.00k | Slot { | 110 | 1.00k | stamp: AtomicUsize::new(i), | 111 | 1.00k | value: UnsafeCell::new(MaybeUninit::uninit()), | 112 | 1.00k | } | 113 | 1.00k | }) |
<crossbeam_queue::array_queue::ArrayQueue<()>>::new::{closure#0} Line | Count | Source | 107 | 2 | .map(|i| { | 108 | 2 | // Set the stamp to `{ lap: 0, index: i }`. | 109 | 2 | Slot { | 110 | 2 | stamp: AtomicUsize::new(i), | 111 | 2 | value: UnsafeCell::new(MaybeUninit::uninit()), | 112 | 2 | } | 113 | 2 | }) |
<crossbeam_queue::array_queue::ArrayQueue<i32>>::new::{closure#0} Line | Count | Source | 107 | 49 | .map(|i| { | 108 | 49 | // Set the stamp to `{ lap: 0, index: i }`. | 109 | 49 | Slot { | 110 | 49 | stamp: AtomicUsize::new(i), | 111 | 49 | value: UnsafeCell::new(MaybeUninit::uninit()), | 112 | 49 | } | 113 | 49 | }) |
<crossbeam_queue::array_queue::ArrayQueue<i32>>::new::{closure#0} Line | Count | Source | 107 | 10 | .map(|i| { | 108 | 10 | // Set the stamp to `{ lap: 0, index: i }`. | 109 | 10 | Slot { | 110 | 10 | stamp: AtomicUsize::new(i), | 111 | 10 | value: UnsafeCell::new(MaybeUninit::uninit()), | 112 | 10 | } | 113 | 10 | }) |
|
114 | 116 | .collect(); |
115 | 116 | let ptr = boxed.as_mut_ptr(); |
116 | 116 | mem::forget(boxed); |
117 | 116 | ptr |
118 | 116 | }; |
119 | 116 | |
120 | 116 | // One lap is the smallest power of two greater than `cap`. |
121 | 116 | let one_lap = (cap + 1).next_power_of_two(); |
122 | 116 | |
123 | 116 | ArrayQueue { |
124 | 116 | buffer, |
125 | 116 | cap, |
126 | 116 | one_lap, |
127 | 116 | head: CachePadded::new(AtomicUsize::new(head)), |
128 | 116 | tail: CachePadded::new(AtomicUsize::new(tail)), |
129 | 116 | _marker: PhantomData, |
130 | 116 | } |
131 | 116 | } <crossbeam_queue::array_queue::ArrayQueue<array_queue::drops::DropCounter>>::new Line | Count | Source | 95 | 100 | pub fn new(cap: usize) -> ArrayQueue<T> { | 96 | 100 | assert!(cap > 0, "capacity must be non-zero"); | 97 | | | 98 | | // Head is initialized to `{ lap: 0, index: 0 }`. | 99 | | // Tail is initialized to `{ lap: 0, index: 0 }`. | 100 | 100 | let head = 0; | 101 | 100 | let tail = 0; | 102 | 100 | | 103 | 100 | // Allocate a buffer of `cap` slots initialized | 104 | 100 | // with stamps. | 105 | 100 | let buffer = { | 106 | 100 | let mut boxed: Box<[Slot<T>]> = (0..cap) | 107 | 100 | .map(|i| { | 108 | | // Set the stamp to `{ lap: 0, index: i }`. | 109 | | Slot { | 110 | | stamp: AtomicUsize::new(i), | 111 | | value: UnsafeCell::new(MaybeUninit::uninit()), | 112 | | } | 113 | 100 | }) | 114 | 100 | .collect(); | 115 | 100 | let ptr = boxed.as_mut_ptr(); | 116 | 100 | mem::forget(boxed); | 117 | 100 | ptr | 118 | 100 | }; | 119 | 100 | | 120 | 100 | // One lap is the smallest power of two greater than `cap`. | 121 | 100 | let one_lap = (cap + 1).next_power_of_two(); | 122 | 100 | | 123 | 100 | ArrayQueue { | 124 | 100 | buffer, | 125 | 100 | cap, | 126 | 100 | one_lap, | 127 | 100 | head: CachePadded::new(AtomicUsize::new(head)), | 128 | 100 | tail: CachePadded::new(AtomicUsize::new(tail)), | 129 | 100 | _marker: PhantomData, | 130 | 100 | } | 131 | 100 | } |
<crossbeam_queue::array_queue::ArrayQueue<()>>::new Line | Count | Source | 95 | 1 | pub fn new(cap: usize) -> ArrayQueue<T> { | 96 | 1 | assert!(cap > 0, "capacity must be non-zero"); | 97 | | | 98 | | // Head is initialized to `{ lap: 0, index: 0 }`. | 99 | | // Tail is initialized to `{ lap: 0, index: 0 }`. | 100 | 1 | let head = 0; | 101 | 1 | let tail = 0; | 102 | 1 | | 103 | 1 | // Allocate a buffer of `cap` slots initialized | 104 | 1 | // with stamps. | 105 | 1 | let buffer = { | 106 | 1 | let mut boxed: Box<[Slot<T>]> = (0..cap) | 107 | 1 | .map(|i| { | 108 | | // Set the stamp to `{ lap: 0, index: i }`. | 109 | | Slot { | 110 | | stamp: AtomicUsize::new(i), | 111 | | value: UnsafeCell::new(MaybeUninit::uninit()), | 112 | | } | 113 | 1 | }) | 114 | 1 | .collect(); | 115 | 1 | let ptr = boxed.as_mut_ptr(); | 116 | 1 | mem::forget(boxed); | 117 | 1 | ptr | 118 | 1 | }; | 119 | 1 | | 120 | 1 | // One lap is the smallest power of two greater than `cap`. | 121 | 1 | let one_lap = (cap + 1).next_power_of_two(); | 122 | 1 | | 123 | 1 | ArrayQueue { | 124 | 1 | buffer, | 125 | 1 | cap, | 126 | 1 | one_lap, | 127 | 1 | head: CachePadded::new(AtomicUsize::new(head)), | 128 | 1 | tail: CachePadded::new(AtomicUsize::new(tail)), | 129 | 1 | _marker: PhantomData, | 130 | 1 | } | 131 | 1 | } |
<crossbeam_queue::array_queue::ArrayQueue<usize>>::new Line | Count | Source | 95 | 3 | pub fn new(cap: usize) -> ArrayQueue<T> { | 96 | 3 | assert!(cap > 0, "capacity must be non-zero"); | 97 | | | 98 | | // Head is initialized to `{ lap: 0, index: 0 }`. | 99 | | // Tail is initialized to `{ lap: 0, index: 0 }`. | 100 | 3 | let head = 0; | 101 | 3 | let tail = 0; | 102 | 3 | | 103 | 3 | // Allocate a buffer of `cap` slots initialized | 104 | 3 | // with stamps. | 105 | 3 | let buffer = { | 106 | 3 | let mut boxed: Box<[Slot<T>]> = (0..cap) | 107 | 3 | .map(|i| { | 108 | | // Set the stamp to `{ lap: 0, index: i }`. | 109 | | Slot { | 110 | | stamp: AtomicUsize::new(i), | 111 | | value: UnsafeCell::new(MaybeUninit::uninit()), | 112 | | } | 113 | 3 | }) | 114 | 3 | .collect(); | 115 | 3 | let ptr = boxed.as_mut_ptr(); | 116 | 3 | mem::forget(boxed); | 117 | 3 | ptr | 118 | 3 | }; | 119 | 3 | | 120 | 3 | // One lap is the smallest power of two greater than `cap`. | 121 | 3 | let one_lap = (cap + 1).next_power_of_two(); | 122 | 3 | | 123 | 3 | ArrayQueue { | 124 | 3 | buffer, | 125 | 3 | cap, | 126 | 3 | one_lap, | 127 | 3 | head: CachePadded::new(AtomicUsize::new(head)), | 128 | 3 | tail: CachePadded::new(AtomicUsize::new(tail)), | 129 | 3 | _marker: PhantomData, | 130 | 3 | } | 131 | 3 | } |
<crossbeam_queue::array_queue::ArrayQueue<i32>>::new Line | Count | Source | 95 | 12 | pub fn new(cap: usize) -> ArrayQueue<T> { | 96 | 12 | assert!(cap > 0, "capacity must be non-zero"); | 97 | | | 98 | | // Head is initialized to `{ lap: 0, index: 0 }`. | 99 | | // Tail is initialized to `{ lap: 0, index: 0 }`. | 100 | 11 | let head = 0; | 101 | 11 | let tail = 0; | 102 | 11 | | 103 | 11 | // Allocate a buffer of `cap` slots initialized | 104 | 11 | // with stamps. | 105 | 11 | let buffer = { | 106 | 11 | let mut boxed: Box<[Slot<T>]> = (0..cap) | 107 | 11 | .map(|i| { | 108 | | // Set the stamp to `{ lap: 0, index: i }`. | 109 | | Slot { | 110 | | stamp: AtomicUsize::new(i), | 111 | | value: UnsafeCell::new(MaybeUninit::uninit()), | 112 | | } | 113 | 11 | }) | 114 | 11 | .collect(); | 115 | 11 | let ptr = boxed.as_mut_ptr(); | 116 | 11 | mem::forget(boxed); | 117 | 11 | ptr | 118 | 11 | }; | 119 | 11 | | 120 | 11 | // One lap is the smallest power of two greater than `cap`. | 121 | 11 | let one_lap = (cap + 1).next_power_of_two(); | 122 | 11 | | 123 | 11 | ArrayQueue { | 124 | 11 | buffer, | 125 | 11 | cap, | 126 | 11 | one_lap, | 127 | 11 | head: CachePadded::new(AtomicUsize::new(head)), | 128 | 11 | tail: CachePadded::new(AtomicUsize::new(tail)), | 129 | 11 | _marker: PhantomData, | 130 | 11 | } | 131 | 11 | } |
<crossbeam_queue::array_queue::ArrayQueue<i32>>::new Line | Count | Source | 95 | 1 | pub fn new(cap: usize) -> ArrayQueue<T> { | 96 | 1 | assert!(cap > 0, "capacity must be non-zero"); | 97 | | | 98 | | // Head is initialized to `{ lap: 0, index: 0 }`. | 99 | | // Tail is initialized to `{ lap: 0, index: 0 }`. | 100 | 1 | let head = 0; | 101 | 1 | let tail = 0; | 102 | 1 | | 103 | 1 | // Allocate a buffer of `cap` slots initialized | 104 | 1 | // with stamps. | 105 | 1 | let buffer = { | 106 | 1 | let mut boxed: Box<[Slot<T>]> = (0..cap) | 107 | 1 | .map(|i| { | 108 | | // Set the stamp to `{ lap: 0, index: i }`. | 109 | | Slot { | 110 | | stamp: AtomicUsize::new(i), | 111 | | value: UnsafeCell::new(MaybeUninit::uninit()), | 112 | | } | 113 | 1 | }) | 114 | 1 | .collect(); | 115 | 1 | let ptr = boxed.as_mut_ptr(); | 116 | 1 | mem::forget(boxed); | 117 | 1 | ptr | 118 | 1 | }; | 119 | 1 | | 120 | 1 | // One lap is the smallest power of two greater than `cap`. | 121 | 1 | let one_lap = (cap + 1).next_power_of_two(); | 122 | 1 | | 123 | 1 | ArrayQueue { | 124 | 1 | buffer, | 125 | 1 | cap, | 126 | 1 | one_lap, | 127 | 1 | head: CachePadded::new(AtomicUsize::new(head)), | 128 | 1 | tail: CachePadded::new(AtomicUsize::new(tail)), | 129 | 1 | _marker: PhantomData, | 130 | 1 | } | 131 | 1 | } |
|
132 | | |
133 | | /// Attempts to push an element into the queue. |
134 | | /// |
135 | | /// If the queue is full, the element is returned back as an error. |
136 | | /// |
137 | | /// # Examples |
138 | | /// |
139 | | /// ``` |
140 | | /// use crossbeam_queue::ArrayQueue; |
141 | | /// |
142 | | /// let q = ArrayQueue::new(1); |
143 | | /// |
144 | | /// assert_eq!(q.push(10), Ok(())); |
145 | | /// assert_eq!(q.push(20), Err(20)); |
146 | | /// ``` |
147 | 881k | pub fn push(&self, value: T) -> Result<(), T> { |
148 | 881k | let backoff = Backoff::new(); |
149 | 881k | let mut tail = self.tail.load(Ordering::Relaxed); |
150 | | |
151 | 951k | loop { |
152 | 951k | // Deconstruct the tail. |
153 | 951k | let index = tail & (self.one_lap - 1); |
154 | 951k | let lap = tail & !(self.one_lap - 1); |
155 | 951k | |
156 | 951k | // Inspect the corresponding slot. |
157 | 951k | let slot = unsafe { &*self.buffer.add(index) }; |
158 | 951k | let stamp = slot.stamp.load(Ordering::Acquire); |
159 | 951k | |
160 | 951k | // If the tail and the stamp match, we may attempt to push. |
161 | 951k | if tail == stamp { |
162 | 881k | let new_tail = if index + 1 < self.cap { |
163 | | // Same lap, incremented index. |
164 | | // Set to `{ lap: lap, index: index + 1 }`. |
165 | 777k | tail + 1 |
166 | | } else { |
167 | | // One lap forward, index wraps around to zero. |
168 | | // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. |
169 | 104k | lap.wrapping_add(self.one_lap) |
170 | | }; |
171 | | |
172 | | // Try moving the tail. |
173 | 881k | match self.tail.compare_exchange_weak( |
174 | 881k | tail, |
175 | 881k | new_tail, |
176 | 881k | Ordering::SeqCst, |
177 | 881k | Ordering::Relaxed, |
178 | 881k | ) { |
179 | 881k | Ok(_) => { |
180 | | // Write the value into the slot and update the stamp. |
181 | 866k | unsafe { |
182 | 866k | slot.value.get().write(MaybeUninit::new(value)); |
183 | 866k | } |
184 | 866k | slot.stamp.store(tail + 1, Ordering::Release); |
185 | 866k | return Ok(()); |
186 | | } |
187 | 19.8k | Err(t) => { |
188 | 19.8k | tail = t; |
189 | 19.8k | backoff.spin(); |
190 | 19.8k | } |
191 | | } |
192 | 69.9k | } else if stamp.wrapping_add(self.one_lap) == tail + 1 { |
193 | 60.2k | atomic::fence(Ordering::SeqCst); |
194 | 60.2k | let head = self.head.load(Ordering::Relaxed); |
195 | 60.2k | |
196 | 60.2k | // If the head lags one lap behind the tail as well... |
197 | 60.2k | if head.wrapping_add(self.one_lap) == tail { |
198 | | // ...then the queue is full. |
199 | 19.3k | return Err(value); |
200 | 40.9k | } |
201 | 40.9k | |
202 | 40.9k | backoff.spin(); |
203 | 40.9k | tail = self.tail.load(Ordering::Relaxed); |
204 | 9.64k | } else { |
205 | 9.64k | // Snooze because we need to wait for the stamp to get updated. |
206 | 9.64k | backoff.snooze(); |
207 | 9.64k | tail = self.tail.load(Ordering::Relaxed); |
208 | 9.64k | } |
209 | | } |
210 | 885k | } <crossbeam_queue::array_queue::ArrayQueue<array_queue::drops::DropCounter>>::push Line | Count | Source | 147 | 550k | pub fn push(&self, value: T) -> Result<(), T> { | 148 | 550k | let backoff = Backoff::new(); | 149 | 550k | let mut tail = self.tail.load(Ordering::Relaxed); | 150 | | | 151 | 576k | loop { | 152 | 576k | // Deconstruct the tail. | 153 | 576k | let index = tail & (self.one_lap - 1); | 154 | 576k | let lap = tail & !(self.one_lap - 1); | 155 | 576k | | 156 | 576k | // Inspect the corresponding slot. | 157 | 576k | let slot = unsafe { &*self.buffer.add(index) }; | 158 | 576k | let stamp = slot.stamp.load(Ordering::Acquire); | 159 | 576k | | 160 | 576k | // If the tail and the stamp match, we may attempt to push. | 161 | 576k | if tail == stamp { | 162 | 540k | let new_tail = if index + 1 < self.cap { | 163 | | // Same lap, incremented index. | 164 | | // Set to `{ lap: lap, index: index + 1 }`. | 165 | 529k | tail + 1 | 166 | | } else { | 167 | | // One lap forward, index wraps around to zero. | 168 | | // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. | 169 | 10.7k | lap.wrapping_add(self.one_lap) | 170 | | }; | 171 | | | 172 | | // Try moving the tail. | 173 | 540k | match self.tail.compare_exchange_weak( | 174 | 540k | tail, | 175 | 540k | new_tail, | 176 | 540k | Ordering::SeqCst, | 177 | 540k | Ordering::Relaxed, | 178 | 540k | ) { | 179 | 540k | Ok(_) => { | 180 | | // Write the value into the slot and update the stamp. | 181 | 540k | unsafe { | 182 | 540k | slot.value.get().write(MaybeUninit::new(value)); | 183 | 540k | } | 184 | 540k | slot.stamp.store(tail + 1, Ordering::Release); | 185 | 540k | return Ok(()); | 186 | | } | 187 | 0 | Err(t) => { | 188 | 0 | tail = t; | 189 | 0 | backoff.spin(); | 190 | 0 | } | 191 | | } | 192 | 35.9k | } else if stamp.wrapping_add(self.one_lap) == tail + 1 { | 193 | 35.9k | atomic::fence(Ordering::SeqCst); | 194 | 35.9k | let head = self.head.load(Ordering::Relaxed); | 195 | 35.9k | | 196 | 35.9k | // If the head lags one lap behind the tail as well... | 197 | 35.9k | if head.wrapping_add(self.one_lap) == tail { | 198 | | // ...then the queue is full. | 199 | 10.3k | return Err(value); | 200 | 25.6k | } | 201 | 25.6k | | 202 | 25.6k | backoff.spin(); | 203 | 25.6k | tail = self.tail.load(Ordering::Relaxed); | 204 | 0 | } else { | 205 | 0 | // Snooze because we need to wait for the stamp to get updated. | 206 | 0 | backoff.snooze(); | 207 | 0 | tail = self.tail.load(Ordering::Relaxed); | 208 | 0 | } | 209 | | } | 210 | 550k | } |
<crossbeam_queue::array_queue::ArrayQueue<i32>>::push Line | Count | Source | 147 | 99.9k | pub fn push(&self, value: T) -> Result<(), T> { | 148 | 99.9k | let backoff = Backoff::new(); | 149 | 99.9k | let mut tail = self.tail.load(Ordering::Relaxed); | 150 | | | 151 | 115k | loop { | 152 | 115k | // Deconstruct the tail. | 153 | 115k | let index = tail & (self.one_lap - 1); | 154 | 115k | let lap = tail & !(self.one_lap - 1); | 155 | 115k | | 156 | 115k | // Inspect the corresponding slot. | 157 | 115k | let slot = unsafe { &*self.buffer.add(index) }; | 158 | 115k | let stamp = slot.stamp.load(Ordering::Acquire); | 159 | 115k | | 160 | 115k | // If the tail and the stamp match, we may attempt to push. | 161 | 115k | if tail == stamp { | 162 | 110k | let new_tail = if index + 1 < self.cap { | 163 | | // Same lap, incremented index. | 164 | | // Set to `{ lap: lap, index: index + 1 }`. | 165 | 83.1k | tail + 1 | 166 | | } else { | 167 | | // One lap forward, index wraps around to zero. | 168 | | // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. | 169 | 27.3k | lap.wrapping_add(self.one_lap) | 170 | | }; | 171 | | | 172 | | // Try moving the tail. | 173 | 110k | match self.tail.compare_exchange_weak( | 174 | 110k | tail, | 175 | 110k | new_tail, | 176 | 110k | Ordering::SeqCst, | 177 | 110k | Ordering::Relaxed, | 178 | 110k | ) { | 179 | 110k | Ok(_) => { | 180 | | // Write the value into the slot and update the stamp. | 181 | 100k | unsafe { | 182 | 100k | slot.value.get().write(MaybeUninit::new(value)); | 183 | 100k | } | 184 | 100k | slot.stamp.store(tail + 1, Ordering::Release); | 185 | 100k | return Ok(()); | 186 | | } | 187 | 11.2k | Err(t) => { | 188 | 11.2k | tail = t; | 189 | 11.2k | backoff.spin(); | 190 | 11.2k | } | 191 | | } | 192 | 4.64k | } else if stamp.wrapping_add(self.one_lap) == tail + 1 { | 193 | 797 | atomic::fence(Ordering::SeqCst); | 194 | 797 | let head = self.head.load(Ordering::Relaxed); | 195 | 797 | | 196 | 797 | // If the head lags one lap behind the tail as well... | 197 | 797 | if head.wrapping_add(self.one_lap) == tail { | 198 | | // ...then the queue is full. | 199 | 679 | return Err(value); | 200 | 118 | } | 201 | 118 | | 202 | 118 | backoff.spin(); | 203 | 118 | tail = self.tail.load(Ordering::Relaxed); | 204 | 3.84k | } else { | 205 | 3.84k | // Snooze because we need to wait for the stamp to get updated. | 206 | 3.84k | backoff.snooze(); | 207 | 3.84k | tail = self.tail.load(Ordering::Relaxed); | 208 | 3.84k | } | 209 | | } | 210 | 100k | } |
<crossbeam_queue::array_queue::ArrayQueue<()>>::push Line | Count | Source | 147 | 2 | pub fn push(&self, value: T) -> Result<(), T> { | 148 | 2 | let backoff = Backoff::new(); | 149 | 2 | let mut tail = self.tail.load(Ordering::Relaxed); | 150 | | | 151 | 2 | loop { | 152 | 2 | // Deconstruct the tail. | 153 | 2 | let index = tail & (self.one_lap - 1); | 154 | 2 | let lap = tail & !(self.one_lap - 1); | 155 | 2 | | 156 | 2 | // Inspect the corresponding slot. | 157 | 2 | let slot = unsafe { &*self.buffer.add(index) }; | 158 | 2 | let stamp = slot.stamp.load(Ordering::Acquire); | 159 | 2 | | 160 | 2 | // If the tail and the stamp match, we may attempt to push. | 161 | 2 | if tail == stamp { | 162 | 2 | let new_tail = if index + 1 < self.cap { | 163 | | // Same lap, incremented index. | 164 | | // Set to `{ lap: lap, index: index + 1 }`. | 165 | 1 | tail + 1 | 166 | | } else { | 167 | | // One lap forward, index wraps around to zero. | 168 | | // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. | 169 | 1 | lap.wrapping_add(self.one_lap) | 170 | | }; | 171 | | | 172 | | // Try moving the tail. | 173 | 2 | match self.tail.compare_exchange_weak( | 174 | 2 | tail, | 175 | 2 | new_tail, | 176 | 2 | Ordering::SeqCst, | 177 | 2 | Ordering::Relaxed, | 178 | 2 | ) { | 179 | 2 | Ok(_) => { | 180 | | // Write the value into the slot and update the stamp. | 181 | 2 | unsafe { | 182 | 2 | slot.value.get().write(MaybeUninit::new(value)); | 183 | 2 | } | 184 | 2 | slot.stamp.store(tail + 1, Ordering::Release); | 185 | 2 | return Ok(()); | 186 | | } | 187 | 0 | Err(t) => { | 188 | 0 | tail = t; | 189 | 0 | backoff.spin(); | 190 | 0 | } | 191 | | } | 192 | 0 | } else if stamp.wrapping_add(self.one_lap) == tail + 1 { | 193 | 0 | atomic::fence(Ordering::SeqCst); | 194 | 0 | let head = self.head.load(Ordering::Relaxed); | 195 | 0 |
| 196 | 0 | // If the head lags one lap behind the tail as well... | 197 | 0 | if head.wrapping_add(self.one_lap) == tail { | 198 | | // ...then the queue is full. | 199 | 0 | return Err(value); | 200 | 0 | } | 201 | 0 |
| 202 | 0 | backoff.spin(); | 203 | 0 | tail = self.tail.load(Ordering::Relaxed); | 204 | 0 | } else { | 205 | 0 | // Snooze because we need to wait for the stamp to get updated. | 206 | 0 | backoff.snooze(); | 207 | 0 | tail = self.tail.load(Ordering::Relaxed); | 208 | 0 | } | 209 | | } | 210 | 2 | } |
<crossbeam_queue::array_queue::ArrayQueue<usize>>::push Line | Count | Source | 147 | 230k | pub fn push(&self, value: T) -> Result<(), T> { | 148 | 230k | let backoff = Backoff::new(); | 149 | 230k | let mut tail = self.tail.load(Ordering::Relaxed); | 150 | | | 151 | 259k | loop { | 152 | 259k | // Deconstruct the tail. | 153 | 259k | let index = tail & (self.one_lap - 1); | 154 | 259k | let lap = tail & !(self.one_lap - 1); | 155 | 259k | | 156 | 259k | // Inspect the corresponding slot. | 157 | 259k | let slot = unsafe { &*self.buffer.add(index) }; | 158 | 259k | let stamp = slot.stamp.load(Ordering::Acquire); | 159 | 259k | | 160 | 259k | // If the tail and the stamp match, we may attempt to push. | 161 | 259k | if tail == stamp { | 162 | 230k | let new_tail = if index + 1 < self.cap { | 163 | | // Same lap, incremented index. | 164 | | // Set to `{ lap: lap, index: index + 1 }`. | 165 | 164k | tail + 1 | 166 | | } else { | 167 | | // One lap forward, index wraps around to zero. | 168 | | // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. | 169 | 66.0k | lap.wrapping_add(self.one_lap) | 170 | | }; | 171 | | | 172 | | // Try moving the tail. | 173 | 230k | match self.tail.compare_exchange_weak( | 174 | 230k | tail, | 175 | 230k | new_tail, | 176 | 230k | Ordering::SeqCst, | 177 | 230k | Ordering::Relaxed, | 178 | 230k | ) { | 179 | 230k | Ok(_) => { | 180 | | // Write the value into the slot and update the stamp. | 181 | 225k | unsafe { | 182 | 225k | slot.value.get().write(MaybeUninit::new(value)); | 183 | 225k | } | 184 | 225k | slot.stamp.store(tail + 1, Ordering::Release); | 185 | 225k | return Ok(()); | 186 | | } | 187 | 8.55k | Err(t) => { | 188 | 8.55k | tail = t; | 189 | 8.55k | backoff.spin(); | 190 | 8.55k | } | 191 | | } | 192 | 29.2k | } else if stamp.wrapping_add(self.one_lap) == tail + 1 { | 193 | 23.4k | atomic::fence(Ordering::SeqCst); | 194 | 23.4k | let head = self.head.load(Ordering::Relaxed); | 195 | 23.4k | | 196 | 23.4k | // If the head lags one lap behind the tail as well... | 197 | 23.4k | if head.wrapping_add(self.one_lap) == tail { | 198 | | // ...then the queue is full. | 199 | 8.37k | return Err(value); | 200 | 15.1k | } | 201 | 15.1k | | 202 | 15.1k | backoff.spin(); | 203 | 15.1k | tail = self.tail.load(Ordering::Relaxed); | 204 | 5.80k | } else { | 205 | 5.80k | // Snooze because we need to wait for the stamp to get updated. | 206 | 5.80k | backoff.snooze(); | 207 | 5.80k | tail = self.tail.load(Ordering::Relaxed); | 208 | 5.80k | } | 209 | | } | 210 | 234k | } |
<crossbeam_queue::array_queue::ArrayQueue<i32>>::push Line | Count | Source | 147 | 1 | pub fn push(&self, value: T) -> Result<(), T> { | 148 | 1 | let backoff = Backoff::new(); | 149 | 1 | let mut tail = self.tail.load(Ordering::Relaxed); | 150 | | | 151 | 1 | loop { | 152 | 1 | // Deconstruct the tail. | 153 | 1 | let index = tail & (self.one_lap - 1); | 154 | 1 | let lap = tail & !(self.one_lap - 1); | 155 | 1 | | 156 | 1 | // Inspect the corresponding slot. | 157 | 1 | let slot = unsafe { &*self.buffer.add(index) }; | 158 | 1 | let stamp = slot.stamp.load(Ordering::Acquire); | 159 | 1 | | 160 | 1 | // If the tail and the stamp match, we may attempt to push. | 161 | 1 | if tail == stamp { | 162 | 1 | let new_tail = if index + 1 < self.cap { | 163 | | // Same lap, incremented index. | 164 | | // Set to `{ lap: lap, index: index + 1 }`. | 165 | 1 | tail + 1 | 166 | | } else { | 167 | | // One lap forward, index wraps around to zero. | 168 | | // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. | 169 | 0 | lap.wrapping_add(self.one_lap) | 170 | | }; | 171 | | | 172 | | // Try moving the tail. | 173 | 1 | match self.tail.compare_exchange_weak( | 174 | 1 | tail, | 175 | 1 | new_tail, | 176 | 1 | Ordering::SeqCst, | 177 | 1 | Ordering::Relaxed, | 178 | 1 | ) { | 179 | 1 | Ok(_) => { | 180 | | // Write the value into the slot and update the stamp. | 181 | 1 | unsafe { | 182 | 1 | slot.value.get().write(MaybeUninit::new(value)); | 183 | 1 | } | 184 | 1 | slot.stamp.store(tail + 1, Ordering::Release); | 185 | 1 | return Ok(()); | 186 | | } | 187 | 0 | Err(t) => { | 188 | 0 | tail = t; | 189 | 0 | backoff.spin(); | 190 | 0 | } | 191 | | } | 192 | 0 | } else if stamp.wrapping_add(self.one_lap) == tail + 1 { | 193 | 0 | atomic::fence(Ordering::SeqCst); | 194 | 0 | let head = self.head.load(Ordering::Relaxed); | 195 | 0 |
| 196 | 0 | // If the head lags one lap behind the tail as well... | 197 | 0 | if head.wrapping_add(self.one_lap) == tail { | 198 | | // ...then the queue is full. | 199 | 0 | return Err(value); | 200 | 0 | } | 201 | 0 |
| 202 | 0 | backoff.spin(); | 203 | 0 | tail = self.tail.load(Ordering::Relaxed); | 204 | 0 | } else { | 205 | 0 | // Snooze because we need to wait for the stamp to get updated. | 206 | 0 | backoff.snooze(); | 207 | 0 | tail = self.tail.load(Ordering::Relaxed); | 208 | 0 | } | 209 | | } | 210 | 1 | } |
|
211 | | |
212 | | /// Attempts to pop an element from the queue. |
213 | | /// |
214 | | /// If the queue is empty, `None` is returned. |
215 | | /// |
216 | | /// # Examples |
217 | | /// |
218 | | /// ``` |
219 | | /// use crossbeam_queue::ArrayQueue; |
220 | | /// |
221 | | /// let q = ArrayQueue::new(1); |
222 | | /// assert_eq!(q.push(10), Ok(())); |
223 | | /// |
224 | | /// assert_eq!(q.pop(), Some(10)); |
225 | | /// assert!(q.pop().is_none()); |
226 | | /// ``` |
227 | 898k | pub fn pop(&self) -> Option<T> { |
228 | 898k | let backoff = Backoff::new(); |
229 | 898k | let mut head = self.head.load(Ordering::Relaxed); |
230 | | |
231 | 937k | loop { |
232 | 937k | // Deconstruct the head. |
233 | 937k | let index = head & (self.one_lap - 1); |
234 | 937k | let lap = head & !(self.one_lap - 1); |
235 | 937k | |
236 | 937k | // Inspect the corresponding slot. |
237 | 937k | let slot = unsafe { &*self.buffer.add(index) }; |
238 | 937k | let stamp = slot.stamp.load(Ordering::Acquire); |
239 | 937k | |
240 | 937k | // If the the stamp is ahead of the head by 1, we may attempt to pop. |
241 | 937k | if head + 1 == stamp { |
242 | 882k | let new = if index + 1 < self.cap { |
243 | | // Same lap, incremented index. |
244 | | // Set to `{ lap: lap, index: index + 1 }`. |
245 | 776k | head + 1 |
246 | | } else { |
247 | | // One lap forward, index wraps around to zero. |
248 | | // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. |
249 | 105k | lap.wrapping_add(self.one_lap) |
250 | | }; |
251 | | |
252 | | // Try moving the head. |
253 | 882k | match self.head.compare_exchange_weak( |
254 | 882k | head, |
255 | 882k | new, |
256 | 882k | Ordering::SeqCst, |
257 | 882k | Ordering::Relaxed, |
258 | 882k | ) { |
259 | 882k | Ok(_) => { |
260 | | // Read the value from the slot and update the stamp. |
261 | 862k | let msg = unsafe { slot.value.get().read().assume_init() }; |
262 | 862k | slot.stamp |
263 | 862k | .store(head.wrapping_add(self.one_lap), Ordering::Release); |
264 | 862k | return Some(msg); |
265 | | } |
266 | 24.7k | Err(h) => { |
267 | 24.7k | head = h; |
268 | 24.7k | backoff.spin(); |
269 | 24.7k | } |
270 | | } |
271 | 54.9k | } else if stamp == head { |
272 | 45.1k | atomic::fence(Ordering::SeqCst); |
273 | 45.1k | let tail = self.tail.load(Ordering::Relaxed); |
274 | 45.1k | |
275 | 45.1k | // If the tail equals the head, that means the channel is empty. |
276 | 45.1k | if tail == head { |
277 | 40.9k | return None; |
278 | 4.24k | } |
279 | 4.24k | |
280 | 4.24k | backoff.spin(); |
281 | 4.24k | head = self.head.load(Ordering::Relaxed); |
282 | 9.82k | } else { |
283 | 9.82k | // Snooze because we need to wait for the stamp to get updated. |
284 | 9.82k | backoff.snooze(); |
285 | 9.82k | head = self.head.load(Ordering::Relaxed); |
286 | 9.82k | } |
287 | | } |
288 | 903k | } <crossbeam_queue::array_queue::ArrayQueue<i32>>::pop Line | Count | Source | 227 | 100k | pub fn pop(&self) -> Option<T> { | 228 | 100k | let backoff = Backoff::new(); | 229 | 100k | let mut head = self.head.load(Ordering::Relaxed); | 230 | | | 231 | 116k | loop { | 232 | 116k | // Deconstruct the head. | 233 | 116k | let index = head & (self.one_lap - 1); | 234 | 116k | let lap = head & !(self.one_lap - 1); | 235 | 116k | | 236 | 116k | // Inspect the corresponding slot. | 237 | 116k | let slot = unsafe { &*self.buffer.add(index) }; | 238 | 116k | let stamp = slot.stamp.load(Ordering::Acquire); | 239 | 116k | | 240 | 116k | // If the the stamp is ahead of the head by 1, we may attempt to pop. | 241 | 116k | if head + 1 == stamp { | 242 | 112k | let new = if index + 1 < self.cap { | 243 | | // Same lap, incremented index. | 244 | | // Set to `{ lap: lap, index: index + 1 }`. | 245 | 84.4k | head + 1 | 246 | | } else { | 247 | | // One lap forward, index wraps around to zero. | 248 | | // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. | 249 | 27.8k | lap.wrapping_add(self.one_lap) | 250 | | }; | 251 | | | 252 | | // Try moving the head. | 253 | 112k | match self.head.compare_exchange_weak( | 254 | 112k | head, | 255 | 112k | new, | 256 | 112k | Ordering::SeqCst, | 257 | 112k | Ordering::Relaxed, | 258 | 112k | ) { | 259 | 112k | Ok(_) => { | 260 | | // Read the value from the slot and update the stamp. | 261 | 100k | let msg = unsafe { slot.value.get().read().assume_init() }; | 262 | 100k | slot.stamp | 263 | 100k | .store(head.wrapping_add(self.one_lap), Ordering::Release); | 264 | 100k | return Some(msg); | 265 | | } | 266 | 13.1k | Err(h) => { | 267 | 13.1k | head = h; | 268 | 13.1k | backoff.spin(); | 269 | 13.1k | } | 270 | | } | 271 | 4.15k | } else if stamp == head { | 272 | 949 | atomic::fence(Ordering::SeqCst); | 273 | 949 | let tail = self.tail.load(Ordering::Relaxed); | 274 | 949 | | 275 | 949 | // If the tail equals the head, that means the channel is empty. | 276 | 949 | if tail == head { | 277 | 879 | return None; | 278 | 70 | } | 279 | 70 | | 280 | 70 | backoff.spin(); | 281 | 70 | head = self.head.load(Ordering::Relaxed); | 282 | 3.20k | } else { | 283 | 3.20k | // Snooze because we need to wait for the stamp to get updated. | 284 | 3.20k | backoff.snooze(); | 285 | 3.20k | head = self.head.load(Ordering::Relaxed); | 286 | 3.20k | } | 287 | | } | 288 | 100k | } |
<crossbeam_queue::array_queue::ArrayQueue<array_queue::drops::DropCounter>>::pop Line | Count | Source | 227 | 565k | pub fn pop(&self) -> Option<T> { | 228 | 565k | let backoff = Backoff::new(); | 229 | 565k | let mut head = self.head.load(Ordering::Relaxed); | 230 | | | 231 | 565k | loop { | 232 | 565k | // Deconstruct the head. | 233 | 565k | let index = head & (self.one_lap - 1); | 234 | 565k | let lap = head & !(self.one_lap - 1); | 235 | 565k | | 236 | 565k | // Inspect the corresponding slot. | 237 | 565k | let slot = unsafe { &*self.buffer.add(index) }; | 238 | 565k | let stamp = slot.stamp.load(Ordering::Acquire); | 239 | 565k | | 240 | 565k | // If the the stamp is ahead of the head by 1, we may attempt to pop. | 241 | 565k | if head + 1 == stamp { | 242 | 537k | let new = if index + 1 < self.cap { | 243 | | // Same lap, incremented index. | 244 | | // Set to `{ lap: lap, index: index + 1 }`. | 245 | 527k | head + 1 | 246 | | } else { | 247 | | // One lap forward, index wraps around to zero. | 248 | | // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. | 249 | 10.7k | lap.wrapping_add(self.one_lap) | 250 | | }; | 251 | | | 252 | | // Try moving the head. | 253 | 537k | match self.head.compare_exchange_weak( | 254 | 537k | head, | 255 | 537k | new, | 256 | 537k | Ordering::SeqCst, | 257 | 537k | Ordering::Relaxed, | 258 | 537k | ) { | 259 | 537k | Ok(_) => { | 260 | | // Read the value from the slot and update the stamp. | 261 | 537k | let msg = unsafe { slot.value.get().read().assume_init() }; | 262 | 537k | slot.stamp | 263 | 537k | .store(head.wrapping_add(self.one_lap), Ordering::Release); | 264 | 537k | return Some(msg); | 265 | | } | 266 | 0 | Err(h) => { | 267 | 0 | head = h; | 268 | 0 | backoff.spin(); | 269 | 0 | } | 270 | | } | 271 | 27.7k | } else if stamp == head { | 272 | 27.7k | atomic::fence(Ordering::SeqCst); | 273 | 27.7k | let tail = self.tail.load(Ordering::Relaxed); | 274 | 27.7k | | 275 | 27.7k | // If the tail equals the head, that means the channel is empty. | 276 | 27.7k | if tail == head { | 277 | 27.1k | return None; | 278 | 575 | } | 279 | 575 | | 280 | 575 | backoff.spin(); | 281 | 575 | head = self.head.load(Ordering::Relaxed); | 282 | 0 | } else { | 283 | 0 | // Snooze because we need to wait for the stamp to get updated. | 284 | 0 | backoff.snooze(); | 285 | 0 | head = self.head.load(Ordering::Relaxed); | 286 | 0 | } | 287 | | } | 288 | 565k | } |
<crossbeam_queue::array_queue::ArrayQueue<()>>::pop Line | Count | Source | 227 | 1 | pub fn pop(&self) -> Option<T> { | 228 | 1 | let backoff = Backoff::new(); | 229 | 1 | let mut head = self.head.load(Ordering::Relaxed); | 230 | | | 231 | 1 | loop { | 232 | 1 | // Deconstruct the head. | 233 | 1 | let index = head & (self.one_lap - 1); | 234 | 1 | let lap = head & !(self.one_lap - 1); | 235 | 1 | | 236 | 1 | // Inspect the corresponding slot. | 237 | 1 | let slot = unsafe { &*self.buffer.add(index) }; | 238 | 1 | let stamp = slot.stamp.load(Ordering::Acquire); | 239 | 1 | | 240 | 1 | // If the the stamp is ahead of the head by 1, we may attempt to pop. | 241 | 1 | if head + 1 == stamp { | 242 | 1 | let new = if index + 1 < self.cap { | 243 | | // Same lap, incremented index. | 244 | | // Set to `{ lap: lap, index: index + 1 }`. | 245 | 1 | head + 1 | 246 | | } else { | 247 | | // One lap forward, index wraps around to zero. | 248 | | // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. | 249 | 0 | lap.wrapping_add(self.one_lap) | 250 | | }; | 251 | | | 252 | | // Try moving the head. | 253 | 1 | match self.head.compare_exchange_weak( | 254 | 1 | head, | 255 | 1 | new, | 256 | 1 | Ordering::SeqCst, | 257 | 1 | Ordering::Relaxed, | 258 | 1 | ) { | 259 | 1 | Ok(_) => { | 260 | | // Read the value from the slot and update the stamp. | 261 | 1 | let msg = unsafe { slot.value.get().read().assume_init() }; | 262 | 1 | slot.stamp | 263 | 1 | .store(head.wrapping_add(self.one_lap), Ordering::Release); | 264 | 1 | return Some(msg); | 265 | | } | 266 | 0 | Err(h) => { | 267 | 0 | head = h; | 268 | 0 | backoff.spin(); | 269 | 0 | } | 270 | | } | 271 | 0 | } else if stamp == head { | 272 | 0 | atomic::fence(Ordering::SeqCst); | 273 | 0 | let tail = self.tail.load(Ordering::Relaxed); | 274 | 0 |
| 275 | 0 | // If the tail equals the head, that means the channel is empty. | 276 | 0 | if tail == head { | 277 | 0 | return None; | 278 | 0 | } | 279 | 0 |
| 280 | 0 | backoff.spin(); | 281 | 0 | head = self.head.load(Ordering::Relaxed); | 282 | 0 | } else { | 283 | 0 | // Snooze because we need to wait for the stamp to get updated. | 284 | 0 | backoff.snooze(); | 285 | 0 | head = self.head.load(Ordering::Relaxed); | 286 | 0 | } | 287 | | } | 288 | 1 | } |
<crossbeam_queue::array_queue::ArrayQueue<usize>>::pop Line | Count | Source | 227 | 233k | pub fn pop(&self) -> Option<T> { | 228 | 233k | let backoff = Backoff::new(); | 229 | 233k | let mut head = self.head.load(Ordering::Relaxed); | 230 | | | 231 | 255k | loop { | 232 | 255k | // Deconstruct the head. | 233 | 255k | let index = head & (self.one_lap - 1); | 234 | 255k | let lap = head & !(self.one_lap - 1); | 235 | 255k | | 236 | 255k | // Inspect the corresponding slot. | 237 | 255k | let slot = unsafe { &*self.buffer.add(index) }; | 238 | 255k | let stamp = slot.stamp.load(Ordering::Acquire); | 239 | 255k | | 240 | 255k | // If the the stamp is ahead of the head by 1, we may attempt to pop. | 241 | 255k | if head + 1 == stamp { | 242 | 232k | let new = if index + 1 < self.cap { | 243 | | // Same lap, incremented index. | 244 | | // Set to `{ lap: lap, index: index + 1 }`. | 245 | 165k | head + 1 | 246 | | } else { | 247 | | // One lap forward, index wraps around to zero. | 248 | | // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. | 249 | 67.2k | lap.wrapping_add(self.one_lap) | 250 | | }; | 251 | | | 252 | | // Try moving the head. | 253 | 232k | match self.head.compare_exchange_weak( | 254 | 232k | head, | 255 | 232k | new, | 256 | 232k | Ordering::SeqCst, | 257 | 232k | Ordering::Relaxed, | 258 | 232k | ) { | 259 | 232k | Ok(_) => { | 260 | | // Read the value from the slot and update the stamp. | 261 | 224k | let msg = unsafe { slot.value.get().read().assume_init() }; | 262 | 224k | slot.stamp | 263 | 224k | .store(head.wrapping_add(self.one_lap), Ordering::Release); | 264 | 224k | return Some(msg); | 265 | | } | 266 | 11.5k | Err(h) => { | 267 | 11.5k | head = h; | 268 | 11.5k | backoff.spin(); | 269 | 11.5k | } | 270 | | } | 271 | 23.0k | } else if stamp == head { | 272 | 16.4k | atomic::fence(Ordering::SeqCst); | 273 | 16.4k | let tail = self.tail.load(Ordering::Relaxed); | 274 | 16.4k | | 275 | 16.4k | // If the tail equals the head, that means the channel is empty. | 276 | 16.4k | if tail == head { | 277 | 12.8k | return None; | 278 | 3.60k | } | 279 | 3.60k | | 280 | 3.60k | backoff.spin(); | 281 | 3.60k | head = self.head.load(Ordering::Relaxed); | 282 | 6.61k | } else { | 283 | 6.61k | // Snooze because we need to wait for the stamp to get updated. | 284 | 6.61k | backoff.snooze(); | 285 | 6.61k | head = self.head.load(Ordering::Relaxed); | 286 | 6.61k | } | 287 | | } | 288 | 237k | } |
<crossbeam_queue::array_queue::ArrayQueue<i32>>::pop Line | Count | Source | 227 | 1 | pub fn pop(&self) -> Option<T> { | 228 | 1 | let backoff = Backoff::new(); | 229 | 1 | let mut head = self.head.load(Ordering::Relaxed); | 230 | | | 231 | 1 | loop { | 232 | 1 | // Deconstruct the head. | 233 | 1 | let index = head & (self.one_lap - 1); | 234 | 1 | let lap = head & !(self.one_lap - 1); | 235 | 1 | | 236 | 1 | // Inspect the corresponding slot. | 237 | 1 | let slot = unsafe { &*self.buffer.add(index) }; | 238 | 1 | let stamp = slot.stamp.load(Ordering::Acquire); | 239 | 1 | | 240 | 1 | // If the the stamp is ahead of the head by 1, we may attempt to pop. | 241 | 1 | if head + 1 == stamp { | 242 | 1 | let new = if index + 1 < self.cap { | 243 | | // Same lap, incremented index. | 244 | | // Set to `{ lap: lap, index: index + 1 }`. | 245 | 1 | head + 1 | 246 | | } else { | 247 | | // One lap forward, index wraps around to zero. | 248 | | // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. | 249 | 0 | lap.wrapping_add(self.one_lap) | 250 | | }; | 251 | | | 252 | | // Try moving the head. | 253 | 1 | match self.head.compare_exchange_weak( | 254 | 1 | head, | 255 | 1 | new, | 256 | 1 | Ordering::SeqCst, | 257 | 1 | Ordering::Relaxed, | 258 | 1 | ) { | 259 | 1 | Ok(_) => { | 260 | | // Read the value from the slot and update the stamp. | 261 | 1 | let msg = unsafe { slot.value.get().read().assume_init() }; | 262 | 1 | slot.stamp | 263 | 1 | .store(head.wrapping_add(self.one_lap), Ordering::Release); | 264 | 1 | return Some(msg); | 265 | | } | 266 | 0 | Err(h) => { | 267 | 0 | head = h; | 268 | 0 | backoff.spin(); | 269 | 0 | } | 270 | | } | 271 | 0 | } else if stamp == head { | 272 | 0 | atomic::fence(Ordering::SeqCst); | 273 | 0 | let tail = self.tail.load(Ordering::Relaxed); | 274 | 0 |
| 275 | 0 | // If the tail equals the head, that means the channel is empty. | 276 | 0 | if tail == head { | 277 | 0 | return None; | 278 | 0 | } | 279 | 0 |
| 280 | 0 | backoff.spin(); | 281 | 0 | head = self.head.load(Ordering::Relaxed); | 282 | 0 | } else { | 283 | 0 | // Snooze because we need to wait for the stamp to get updated. | 284 | 0 | backoff.snooze(); | 285 | 0 | head = self.head.load(Ordering::Relaxed); | 286 | 0 | } | 287 | | } | 288 | 1 | } |
|
289 | | |
290 | | /// Returns the capacity of the queue. |
291 | | /// |
292 | | /// # Examples |
293 | | /// |
294 | | /// ``` |
295 | | /// use crossbeam_queue::ArrayQueue; |
296 | | /// |
297 | | /// let q = ArrayQueue::<i32>::new(100); |
298 | | /// |
299 | | /// assert_eq!(q.capacity(), 100); |
300 | | /// ``` |
301 | 9 | pub fn capacity(&self) -> usize { |
302 | 9 | self.cap |
303 | 9 | } |
304 | | |
305 | | /// Returns `true` if the queue is empty. |
306 | | /// |
307 | | /// # Examples |
308 | | /// |
309 | | /// ``` |
310 | | /// use crossbeam_queue::ArrayQueue; |
311 | | /// |
312 | | /// let q = ArrayQueue::new(100); |
313 | | /// |
314 | | /// assert!(q.is_empty()); |
315 | | /// q.push(1).unwrap(); |
316 | | /// assert!(!q.is_empty()); |
317 | | /// ``` |
318 | 4 | pub fn is_empty(&self) -> bool { |
319 | 4 | let head = self.head.load(Ordering::SeqCst); |
320 | 4 | let tail = self.tail.load(Ordering::SeqCst); |
321 | 4 | |
322 | 4 | // Is the tail lagging one lap behind head? |
323 | 4 | // Is the tail equal to the head? |
324 | 4 | // |
325 | 4 | // Note: If the head changes just before we load the tail, that means there was a moment |
326 | 4 | // when the channel was not empty, so it is safe to just return `false`. |
327 | 4 | tail == head |
328 | 4 | } |
329 | | |
330 | | /// Returns `true` if the queue is full. |
331 | | /// |
332 | | /// # Examples |
333 | | /// |
334 | | /// ``` |
335 | | /// use crossbeam_queue::ArrayQueue; |
336 | | /// |
337 | | /// let q = ArrayQueue::new(1); |
338 | | /// |
339 | | /// assert!(!q.is_full()); |
340 | | /// q.push(1).unwrap(); |
341 | | /// assert!(q.is_full()); |
342 | | /// ``` |
343 | 4 | pub fn is_full(&self) -> bool { |
344 | 4 | let tail = self.tail.load(Ordering::SeqCst); |
345 | 4 | let head = self.head.load(Ordering::SeqCst); |
346 | 4 | |
347 | 4 | // Is the head lagging one lap behind tail? |
348 | 4 | // |
349 | 4 | // Note: If the tail changes just before we load the head, that means there was a moment |
350 | 4 | // when the queue was not full, so it is safe to just return `false`. |
351 | 4 | head.wrapping_add(self.one_lap) == tail |
352 | 4 | } |
353 | | |
354 | | /// Returns the number of elements in the queue. |
355 | | /// |
356 | | /// # Examples |
357 | | /// |
358 | | /// ``` |
359 | | /// use crossbeam_queue::ArrayQueue; |
360 | | /// |
361 | | /// let q = ArrayQueue::new(100); |
362 | | /// assert_eq!(q.len(), 0); |
363 | | /// |
364 | | /// q.push(10).unwrap(); |
365 | | /// assert_eq!(q.len(), 1); |
366 | | /// |
367 | | /// q.push(20).unwrap(); |
368 | | /// assert_eq!(q.len(), 2); |
369 | | /// ``` |
370 | 59.2k | pub fn len(&self) -> usize { |
371 | 62.5k | loop { |
372 | 62.5k | // Load the tail, then load the head. |
373 | 62.5k | let tail = self.tail.load(Ordering::SeqCst); |
374 | 62.5k | let head = self.head.load(Ordering::SeqCst); |
375 | 62.5k | |
376 | 62.5k | // If the tail didn't change, we've got consistent values to work with. |
377 | 62.5k | if self.tail.load(Ordering::SeqCst) == tail { |
378 | 59.2k | let hix = head & (self.one_lap - 1); |
379 | 59.2k | let tix = tail & (self.one_lap - 1); |
380 | 59.2k | |
381 | 59.2k | return if hix < tix { |
382 | 35.6k | tix - hix |
383 | 23.6k | } else if hix > tix { |
384 | 19.3k | self.cap - hix + tix |
385 | 4.24k | } else if tail == head { |
386 | 392 | 0 |
387 | | } else { |
388 | 195 | self.cap |
389 | | }; |
390 | 3.24k | } |
391 | | } |
392 | 55.6k | } <crossbeam_queue::array_queue::ArrayQueue<array_queue::drops::DropCounter>>::len Line | Count | Source | 370 | 100 | pub fn len(&self) -> usize { | 371 | 100 | loop { | 372 | 100 | // Load the tail, then load the head. | 373 | 100 | let tail = self.tail.load(Ordering::SeqCst); | 374 | 100 | let head = self.head.load(Ordering::SeqCst); | 375 | 100 | | 376 | 100 | // If the tail didn't change, we've got consistent values to work with. | 377 | 100 | if self.tail.load(Ordering::SeqCst) == tail { | 378 | 100 | let hix = head & (self.one_lap - 1); | 379 | 100 | let tix = tail & (self.one_lap - 1); | 380 | 100 | | 381 | 100 | return if hix < tix { | 382 | 51 | tix - hix | 383 | 49 | } else if hix > tix { | 384 | 45 | self.cap - hix + tix | 385 | 4 | } else if tail == head { | 386 | 4 | 0 | 387 | | } else { | 388 | 0 | self.cap | 389 | | }; | 390 | 0 | } | 391 | | } | 392 | 100 | } |
<crossbeam_queue::array_queue::ArrayQueue<()>>::len Line | Count | Source | 370 | 5 | pub fn len(&self) -> usize { | 371 | 5 | loop { | 372 | 5 | // Load the tail, then load the head. | 373 | 5 | let tail = self.tail.load(Ordering::SeqCst); | 374 | 5 | let head = self.head.load(Ordering::SeqCst); | 375 | 5 | | 376 | 5 | // If the tail didn't change, we've got consistent values to work with. | 377 | 5 | if self.tail.load(Ordering::SeqCst) == tail { | 378 | 5 | let hix = head & (self.one_lap - 1); | 379 | 5 | let tix = tail & (self.one_lap - 1); | 380 | 5 | | 381 | 5 | return if hix < tix { | 382 | 1 | tix - hix | 383 | 4 | } else if hix > tix { | 384 | 2 | self.cap - hix + tix | 385 | 2 | } else if tail == head { | 386 | 1 | 0 | 387 | | } else { | 388 | 1 | self.cap | 389 | | }; | 390 | 0 | } | 391 | | } | 392 | 5 | } |
<crossbeam_queue::array_queue::ArrayQueue<i32>>::len Line | Count | Source | 370 | 11 | pub fn len(&self) -> usize { | 371 | 11 | loop { | 372 | 11 | // Load the tail, then load the head. | 373 | 11 | let tail = self.tail.load(Ordering::SeqCst); | 374 | 11 | let head = self.head.load(Ordering::SeqCst); | 375 | 11 | | 376 | 11 | // If the tail didn't change, we've got consistent values to work with. | 377 | 11 | if self.tail.load(Ordering::SeqCst) == tail { | 378 | 11 | let hix = head & (self.one_lap - 1); | 379 | 11 | let tix = tail & (self.one_lap - 1); | 380 | 11 | | 381 | 11 | return if hix < tix { | 382 | 0 | tix - hix | 383 | 11 | } else if hix > tix { | 384 | 0 | self.cap - hix + tix | 385 | 11 | } else if tail == head { | 386 | 11 | 0 | 387 | | } else { | 388 | 0 | self.cap | 389 | | }; | 390 | 0 | } | 391 | | } | 392 | 11 | } |
<crossbeam_queue::array_queue::ArrayQueue<usize>>::len Line | Count | Source | 370 | 59.1k | pub fn len(&self) -> usize { | 371 | 62.4k | loop { | 372 | 62.4k | // Load the tail, then load the head. | 373 | 62.4k | let tail = self.tail.load(Ordering::SeqCst); | 374 | 62.4k | let head = self.head.load(Ordering::SeqCst); | 375 | 62.4k | | 376 | 62.4k | // If the tail didn't change, we've got consistent values to work with. | 377 | 62.4k | if self.tail.load(Ordering::SeqCst) == tail { | 378 | 59.1k | let hix = head & (self.one_lap - 1); | 379 | 59.1k | let tix = tail & (self.one_lap - 1); | 380 | 59.1k | | 381 | 59.1k | return if hix < tix { | 382 | 35.6k | tix - hix | 383 | 23.5k | } else if hix > tix { | 384 | 19.3k | self.cap - hix + tix | 385 | 4.23k | } else if tail == head { | 386 | 375 | 0 | 387 | | } else { | 388 | 194 | self.cap | 389 | | }; | 390 | 3.24k | } | 391 | | } | 392 | 55.5k | } |
<crossbeam_queue::array_queue::ArrayQueue<i32>>::len Line | Count | Source | 370 | 1 | pub fn len(&self) -> usize { | 371 | 1 | loop { | 372 | 1 | // Load the tail, then load the head. | 373 | 1 | let tail = self.tail.load(Ordering::SeqCst); | 374 | 1 | let head = self.head.load(Ordering::SeqCst); | 375 | 1 | | 376 | 1 | // If the tail didn't change, we've got consistent values to work with. | 377 | 1 | if self.tail.load(Ordering::SeqCst) == tail { | 378 | 1 | let hix = head & (self.one_lap - 1); | 379 | 1 | let tix = tail & (self.one_lap - 1); | 380 | 1 | | 381 | 1 | return if hix < tix { | 382 | 0 | tix - hix | 383 | 1 | } else if hix > tix { | 384 | 0 | self.cap - hix + tix | 385 | 1 | } else if tail == head { | 386 | 1 | 0 | 387 | | } else { | 388 | 0 | self.cap | 389 | | }; | 390 | 0 | } | 391 | | } | 392 | 1 | } |
|
393 | | } |
394 | | |
395 | | impl<T> Drop for ArrayQueue<T> { |
396 | 116 | fn drop(&mut self) { |
397 | 116 | // Get the index of the head. |
398 | 116 | let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1); |
399 | | |
400 | | // Loop over all slots that hold a message and drop them. |
401 | 2.43k | for i in 0..self.len()116 { |
402 | | // Compute the index of the next slot holding a message. |
403 | 2.43k | let index = if hix + i < self.cap { |
404 | 1.67k | hix + i |
405 | | } else { |
406 | 754 | hix + i - self.cap |
407 | | }; |
408 | | |
409 | 2.43k | unsafe { |
410 | 2.43k | let p = { |
411 | 2.43k | let slot = &mut *self.buffer.add(index); |
412 | 2.43k | let value = &mut *slot.value.get(); |
413 | 2.43k | value.as_mut_ptr() |
414 | 2.43k | }; |
415 | 2.43k | p.drop_in_place(); |
416 | 2.43k | } |
417 | | } |
418 | | |
419 | | // Finally, deallocate the buffer, but don't run any destructors. |
420 | 116 | unsafe { |
421 | 116 | // Create a slice from the buffer to make |
422 | 116 | // a fat pointer. Then, use Box::from_raw |
423 | 116 | // to deallocate it. |
424 | 116 | let ptr = core::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; |
425 | 116 | Box::from_raw(ptr); |
426 | 116 | } |
427 | 116 | } <crossbeam_queue::array_queue::ArrayQueue<array_queue::drops::DropCounter> as core::ops::drop::Drop>::drop Line | Count | Source | 396 | 100 | fn drop(&mut self) { | 397 | 100 | // Get the index of the head. | 398 | 100 | let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1); | 399 | | | 400 | | // Loop over all slots that hold a message and drop them. | 401 | 2.43k | for i in 0..self.len()100 { | 402 | | // Compute the index of the next slot holding a message. | 403 | 2.43k | let index = if hix + i < self.cap { | 404 | 1.67k | hix + i | 405 | | } else { | 406 | 754 | hix + i - self.cap | 407 | | }; | 408 | | | 409 | 2.43k | unsafe { | 410 | 2.43k | let p = { | 411 | 2.43k | let slot = &mut *self.buffer.add(index); | 412 | 2.43k | let value = &mut *slot.value.get(); | 413 | 2.43k | value.as_mut_ptr() | 414 | 2.43k | }; | 415 | 2.43k | p.drop_in_place(); | 416 | 2.43k | } | 417 | | } | 418 | | | 419 | | // Finally, deallocate the buffer, but don't run any destructors. | 420 | 100 | unsafe { | 421 | 100 | // Create a slice from the buffer to make | 422 | 100 | // a fat pointer. Then, use Box::from_raw | 423 | 100 | // to deallocate it. | 424 | 100 | let ptr = core::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; | 425 | 100 | Box::from_raw(ptr); | 426 | 100 | } | 427 | 100 | } |
<crossbeam_queue::array_queue::ArrayQueue<usize> as core::ops::drop::Drop>::drop Line | Count | Source | 396 | 3 | fn drop(&mut self) { | 397 | 3 | // Get the index of the head. | 398 | 3 | let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1); | 399 | | | 400 | | // Loop over all slots that hold a message and drop them. | 401 | 3 | for i0 in 0..self.len() { | 402 | | // Compute the index of the next slot holding a message. | 403 | 0 | let index = if hix + i < self.cap { | 404 | 0 | hix + i | 405 | | } else { | 406 | 0 | hix + i - self.cap | 407 | | }; | 408 | | | 409 | 0 | unsafe { | 410 | 0 | let p = { | 411 | 0 | let slot = &mut *self.buffer.add(index); | 412 | 0 | let value = &mut *slot.value.get(); | 413 | 0 | value.as_mut_ptr() | 414 | 0 | }; | 415 | 0 | p.drop_in_place(); | 416 | 0 | } | 417 | | } | 418 | | | 419 | | // Finally, deallocate the buffer, but don't run any destructors. | 420 | 3 | unsafe { | 421 | 3 | // Create a slice from the buffer to make | 422 | 3 | // a fat pointer. Then, use Box::from_raw | 423 | 3 | // to deallocate it. | 424 | 3 | let ptr = core::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; | 425 | 3 | Box::from_raw(ptr); | 426 | 3 | } | 427 | 3 | } |
<crossbeam_queue::array_queue::ArrayQueue<i32> as core::ops::drop::Drop>::drop Line | Count | Source | 396 | 11 | fn drop(&mut self) { | 397 | 11 | // Get the index of the head. | 398 | 11 | let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1); | 399 | | | 400 | | // Loop over all slots that hold a message and drop them. | 401 | 11 | for i0 in 0..self.len() { | 402 | | // Compute the index of the next slot holding a message. | 403 | 0 | let index = if hix + i < self.cap { | 404 | 0 | hix + i | 405 | | } else { | 406 | 0 | hix + i - self.cap | 407 | | }; | 408 | | | 409 | 0 | unsafe { | 410 | 0 | let p = { | 411 | 0 | let slot = &mut *self.buffer.add(index); | 412 | 0 | let value = &mut *slot.value.get(); | 413 | 0 | value.as_mut_ptr() | 414 | 0 | }; | 415 | 0 | p.drop_in_place(); | 416 | 0 | } | 417 | | } | 418 | | | 419 | | // Finally, deallocate the buffer, but don't run any destructors. | 420 | 11 | unsafe { | 421 | 11 | // Create a slice from the buffer to make | 422 | 11 | // a fat pointer. Then, use Box::from_raw | 423 | 11 | // to deallocate it. | 424 | 11 | let ptr = core::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; | 425 | 11 | Box::from_raw(ptr); | 426 | 11 | } | 427 | 11 | } |
<crossbeam_queue::array_queue::ArrayQueue<()> as core::ops::drop::Drop>::drop Line | Count | Source | 396 | 1 | fn drop(&mut self) { | 397 | 1 | // Get the index of the head. | 398 | 1 | let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1); | 399 | | | 400 | | // Loop over all slots that hold a message and drop them. | 401 | 1 | for i in 0..self.len() { | 402 | | // Compute the index of the next slot holding a message. | 403 | 1 | let index = if hix + i < self.cap { | 404 | 1 | hix + i | 405 | | } else { | 406 | 0 | hix + i - self.cap | 407 | | }; | 408 | | | 409 | 1 | unsafe { | 410 | 1 | let p = { | 411 | 1 | let slot = &mut *self.buffer.add(index); | 412 | 1 | let value = &mut *slot.value.get(); | 413 | 1 | value.as_mut_ptr() | 414 | 1 | }; | 415 | 1 | p.drop_in_place(); | 416 | 1 | } | 417 | | } | 418 | | | 419 | | // Finally, deallocate the buffer, but don't run any destructors. | 420 | 1 | unsafe { | 421 | 1 | // Create a slice from the buffer to make | 422 | 1 | // a fat pointer. Then, use Box::from_raw | 423 | 1 | // to deallocate it. | 424 | 1 | let ptr = core::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; | 425 | 1 | Box::from_raw(ptr); | 426 | 1 | } | 427 | 1 | } |
<crossbeam_queue::array_queue::ArrayQueue<i32> as core::ops::drop::Drop>::drop Line | Count | Source | 396 | 1 | fn drop(&mut self) { | 397 | 1 | // Get the index of the head. | 398 | 1 | let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1); | 399 | | | 400 | | // Loop over all slots that hold a message and drop them. | 401 | 1 | for i0 in 0..self.len() { | 402 | | // Compute the index of the next slot holding a message. | 403 | 0 | let index = if hix + i < self.cap { | 404 | 0 | hix + i | 405 | | } else { | 406 | 0 | hix + i - self.cap | 407 | | }; | 408 | | | 409 | 0 | unsafe { | 410 | 0 | let p = { | 411 | 0 | let slot = &mut *self.buffer.add(index); | 412 | 0 | let value = &mut *slot.value.get(); | 413 | 0 | value.as_mut_ptr() | 414 | 0 | }; | 415 | 0 | p.drop_in_place(); | 416 | 0 | } | 417 | | } | 418 | | | 419 | | // Finally, deallocate the buffer, but don't run any destructors. | 420 | 1 | unsafe { | 421 | 1 | // Create a slice from the buffer to make | 422 | 1 | // a fat pointer. Then, use Box::from_raw | 423 | 1 | // to deallocate it. | 424 | 1 | let ptr = core::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; | 425 | 1 | Box::from_raw(ptr); | 426 | 1 | } | 427 | 1 | } |
|
428 | | } |
429 | | |
430 | | impl<T> fmt::Debug for ArrayQueue<T> { |
431 | | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
432 | | f.pad("ArrayQueue { .. }") |
433 | | } |
434 | | } |