crossbeam-queue/src/seg_queue.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use alloc::boxed::Box; |
2 | | use core::cell::UnsafeCell; |
3 | | use core::fmt; |
4 | | use core::marker::PhantomData; |
5 | | use core::mem::MaybeUninit; |
6 | | use core::ptr; |
7 | | use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; |
8 | | |
9 | | use crossbeam_utils::{Backoff, CachePadded}; |
10 | | |
11 | | // Bits indicating the state of a slot: |
12 | | // * If a value has been written into the slot, `WRITE` is set. |
13 | | // * If a value has been read from the slot, `READ` is set. |
14 | | // * If the block is being destroyed, `DESTROY` is set. |
15 | | const WRITE: usize = 1; |
16 | | const READ: usize = 2; |
17 | | const DESTROY: usize = 4; |
18 | | |
19 | | // Each block covers one "lap" of indices. |
20 | | const LAP: usize = 32; |
21 | | // The maximum number of values a block can hold. |
22 | | const BLOCK_CAP: usize = LAP - 1; |
23 | | // How many lower bits are reserved for metadata. |
24 | | const SHIFT: usize = 1; |
25 | | // Indicates that the block is not the last one. |
26 | | const HAS_NEXT: usize = 1; |
27 | | |
28 | | /// A slot in a block. |
29 | | struct Slot<T> { |
30 | | /// The value. |
31 | | value: UnsafeCell<MaybeUninit<T>>, |
32 | | |
33 | | /// The state of the slot. |
34 | | state: AtomicUsize, |
35 | | } |
36 | | |
37 | | impl<T> Slot<T> { |
38 | | /// Waits until a value is written into the slot. |
39 | 668k | fn wait_write(&self) { |
40 | 668k | let backoff = Backoff::new(); |
41 | 668k | while self.state.load(Ordering::Acquire) & WRITE == 0 { |
42 | 2 | backoff.snooze(); |
43 | 2 | } |
44 | 668k | } <crossbeam_queue::seg_queue::Slot<usize>>::wait_write Line | Count | Source | 39 | 192k | fn wait_write(&self) { | 40 | 192k | let backoff = Backoff::new(); | 41 | 192k | while self.state.load(Ordering::Acquire) & WRITE == 0 { | 42 | 0 | backoff.snooze(); | 43 | 0 | } | 44 | 192k | } |
<crossbeam_queue::seg_queue::Slot<i32>>::wait_write Line | Count | Source | 39 | 2 | fn wait_write(&self) { | 40 | 2 | let backoff = Backoff::new(); | 41 | 2 | while self.state.load(Ordering::Acquire) & WRITE == 0 { | 42 | 0 | backoff.snooze(); | 43 | 0 | } | 44 | 2 | } |
<crossbeam_queue::seg_queue::Slot<()>>::wait_write Line | Count | Source | 39 | 1 | fn wait_write(&self) { | 40 | 1 | let backoff = Backoff::new(); | 41 | 1 | while self.state.load(Ordering::Acquire) & WRITE == 0 { | 42 | 0 | backoff.snooze(); | 43 | 0 | } | 44 | 1 | } |
<crossbeam_queue::seg_queue::Slot<seg_queue::drops::DropCounter>>::wait_write Line | Count | Source | 39 | 475k | fn wait_write(&self) { | 40 | 475k | let backoff = Backoff::new(); | 41 | 475k | while self.state.load(Ordering::Acquire) & WRITE == 0 { | 42 | 2 | backoff.snooze(); | 43 | 2 | } | 44 | 475k | } |
|
45 | | } |
46 | | |
47 | | /// A block in a linked list. |
48 | | /// |
49 | | /// Each block in the list can hold up to `BLOCK_CAP` values. |
50 | | struct Block<T> { |
51 | | /// The next block in the linked list. |
52 | | next: AtomicPtr<Block<T>>, |
53 | | |
54 | | /// Slots for values. |
55 | | slots: [Slot<T>; BLOCK_CAP], |
56 | | } |
57 | | |
58 | | impl<T> Block<T> { |
59 | | /// Creates an empty block that starts at `start_index`. |
60 | 23.5k | fn new() -> Block<T> { |
61 | 23.5k | // SAFETY: This is safe because: |
62 | 23.5k | // [1] `Block::next` (AtomicPtr) may be safely zero initialized. |
63 | 23.5k | // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. |
64 | 23.5k | // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it |
65 | 23.5k | // holds a MaybeUninit. |
66 | 23.5k | // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. |
67 | 23.5k | unsafe { MaybeUninit::zeroed().assume_init() } |
68 | 23.5k | } <crossbeam_queue::seg_queue::Block<i32>>::new Line | Count | Source | 60 | 1 | fn new() -> Block<T> { | 61 | 1 | // SAFETY: This is safe because: | 62 | 1 | // [1] `Block::next` (AtomicPtr) may be safely zero initialized. | 63 | 1 | // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. | 64 | 1 | // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it | 65 | 1 | // holds a MaybeUninit. | 66 | 1 | // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. | 67 | 1 | unsafe { MaybeUninit::zeroed().assume_init() } | 68 | 1 | } |
<crossbeam_queue::seg_queue::Block<seg_queue::drops::DropCounter>>::new Line | Count | Source | 60 | 17.0k | fn new() -> Block<T> { | 61 | 17.0k | // SAFETY: This is safe because: | 62 | 17.0k | // [1] `Block::next` (AtomicPtr) may be safely zero initialized. | 63 | 17.0k | // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. | 64 | 17.0k | // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it | 65 | 17.0k | // holds a MaybeUninit. | 66 | 17.0k | // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. | 67 | 17.0k | unsafe { MaybeUninit::zeroed().assume_init() } | 68 | 17.0k | } |
<crossbeam_queue::seg_queue::Block<()>>::new Line | Count | Source | 60 | 1 | fn new() -> Block<T> { | 61 | 1 | // SAFETY: This is safe because: | 62 | 1 | // [1] `Block::next` (AtomicPtr) may be safely zero initialized. | 63 | 1 | // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. | 64 | 1 | // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it | 65 | 1 | // holds a MaybeUninit. | 66 | 1 | // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. | 67 | 1 | unsafe { MaybeUninit::zeroed().assume_init() } | 68 | 1 | } |
<crossbeam_queue::seg_queue::Block<usize>>::new Line | Count | Source | 60 | 6.50k | fn new() -> Block<T> { | 61 | 6.50k | // SAFETY: This is safe because: | 62 | 6.50k | // [1] `Block::next` (AtomicPtr) may be safely zero initialized. | 63 | 6.50k | // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. | 64 | 6.50k | // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it | 65 | 6.50k | // holds a MaybeUninit. | 66 | 6.50k | // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. | 67 | 6.50k | unsafe { MaybeUninit::zeroed().assume_init() } | 68 | 6.50k | } |
|
69 | | |
70 | | /// Waits until the next pointer is set. |
71 | 21.7k | fn wait_next(&self) -> *mut Block<T> { |
72 | 21.7k | let backoff = Backoff::new(); |
73 | | loop { |
74 | 21.7k | let next = self.next.load(Ordering::Acquire); |
75 | 21.7k | if !next.is_null() { |
76 | 21.7k | return next; |
77 | 2 | } |
78 | 2 | backoff.snooze(); |
79 | | } |
80 | 21.7k | } <crossbeam_queue::seg_queue::Block<usize>>::wait_next Line | Count | Source | 71 | 6.44k | fn wait_next(&self) -> *mut Block<T> { | 72 | 6.44k | let backoff = Backoff::new(); | 73 | | loop { | 74 | 6.44k | let next = self.next.load(Ordering::Acquire); | 75 | 6.44k | if !next.is_null() { | 76 | 6.44k | return next; | 77 | 0 | } | 78 | 0 | backoff.snooze(); | 79 | | } | 80 | 6.44k | } |
<crossbeam_queue::seg_queue::Block<seg_queue::drops::DropCounter>>::wait_next Line | Count | Source | 71 | 15.2k | fn wait_next(&self) -> *mut Block<T> { | 72 | 15.2k | let backoff = Backoff::new(); | 73 | | loop { | 74 | 15.2k | let next = self.next.load(Ordering::Acquire); | 75 | 15.2k | if !next.is_null() { | 76 | 15.2k | return next; | 77 | 2 | } | 78 | 2 | backoff.snooze(); | 79 | | } | 80 | 15.2k | } |
Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<()>>::wait_next Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<i32>>::wait_next |
81 | | |
82 | | /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. |
83 | 21.7k | unsafe fn destroy(this: *mut Block<T>, start: usize) { |
84 | | // It is not necessary to set the `DESTROY` bit in the last slot because that slot has |
85 | | // begun destruction of the block. |
86 | 650k | for i in start..BLOCK_CAP - 121.7k { |
87 | 650k | let slot = (*this).slots.get_unchecked(i); |
88 | | |
89 | | // Mark the `DESTROY` bit if a thread is still using the slot. |
90 | 650k | if slot.state.load(Ordering::Acquire) & READ == 0 |
91 | 1 | && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 |
92 | | { |
93 | | // If a thread is still using the slot, it will continue destruction of the block. |
94 | 1 | return; |
95 | 650k | } |
96 | | } |
97 | | |
98 | | // No thread is using the block, now it is safe to destroy it. |
99 | 21.7k | drop(Box::from_raw(this)); |
100 | 21.7k | } Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<()>>::destroy <crossbeam_queue::seg_queue::Block<usize>>::destroy Line | Count | Source | 83 | 6.44k | unsafe fn destroy(this: *mut Block<T>, start: usize) { | 84 | | // It is not necessary to set the `DESTROY` bit in the last slot because that slot has | 85 | | // begun destruction of the block. | 86 | 191k | for i in start..BLOCK_CAP - 16.44k { | 87 | 191k | let slot = (*this).slots.get_unchecked(i); | 88 | | | 89 | | // Mark the `DESTROY` bit if a thread is still using the slot. | 90 | 191k | if slot.state.load(Ordering::Acquire) & READ == 0 | 91 | 1 | && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 | 92 | | { | 93 | | // If a thread is still using the slot, it will continue destruction of the block. | 94 | 1 | return; | 95 | 192k | } | 96 | | } | 97 | | | 98 | | // No thread is using the block, now it is safe to destroy it. | 99 | 6.44k | drop(Box::from_raw(this)); | 100 | 6.45k | } |
Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<i32>>::destroy <crossbeam_queue::seg_queue::Block<seg_queue::drops::DropCounter>>::destroy Line | Count | Source | 83 | 15.2k | unsafe fn destroy(this: *mut Block<T>, start: usize) { | 84 | | // It is not necessary to set the `DESTROY` bit in the last slot because that slot has | 85 | | // begun destruction of the block. | 86 | 458k | for i in start..BLOCK_CAP - 115.2k { | 87 | 458k | let slot = (*this).slots.get_unchecked(i); | 88 | | | 89 | | // Mark the `DESTROY` bit if a thread is still using the slot. | 90 | 458k | if slot.state.load(Ordering::Acquire) & READ == 0 | 91 | 0 | && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 | 92 | | { | 93 | | // If a thread is still using the slot, it will continue destruction of the block. | 94 | 0 | return; | 95 | 458k | } | 96 | | } | 97 | | | 98 | | // No thread is using the block, now it is safe to destroy it. | 99 | 15.2k | drop(Box::from_raw(this)); | 100 | 15.2k | } |
|
101 | | } |
102 | | |
103 | | /// A position in a queue. |
104 | | struct Position<T> { |
105 | | /// The index in the queue. |
106 | | index: AtomicUsize, |
107 | | |
108 | | /// The block in the linked list. |
109 | | block: AtomicPtr<Block<T>>, |
110 | | } |
111 | | |
112 | | /// An unbounded multi-producer multi-consumer queue. |
113 | | /// |
114 | | /// This queue is implemented as a linked list of segments, where each segment is a small buffer |
115 | | /// that can hold a handful of elements. There is no limit to how many elements can be in the queue |
116 | | /// at a time. However, since segments need to be dynamically allocated as elements get pushed, |
117 | | /// this queue is somewhat slower than [`ArrayQueue`]. |
118 | | /// |
119 | | /// [`ArrayQueue`]: super::ArrayQueue |
120 | | /// |
121 | | /// # Examples |
122 | | /// |
123 | | /// ``` |
124 | | /// use crossbeam_queue::SegQueue; |
125 | | /// |
126 | | /// let q = SegQueue::new(); |
127 | | /// |
128 | | /// q.push('a'); |
129 | | /// q.push('b'); |
130 | | /// |
131 | | /// assert_eq!(q.pop(), Some('a')); |
132 | | /// assert_eq!(q.pop(), Some('b')); |
133 | | /// assert!(q.pop().is_none()); |
134 | | /// ``` |
135 | | pub struct SegQueue<T> { |
136 | | /// The head of the queue. |
137 | | head: CachePadded<Position<T>>, |
138 | | |
139 | | /// The tail of the queue. |
140 | | tail: CachePadded<Position<T>>, |
141 | | |
142 | | /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`. |
143 | | _marker: PhantomData<T>, |
144 | | } |
145 | | |
146 | | unsafe impl<T: Send> Send for SegQueue<T> {} |
147 | | unsafe impl<T: Send> Sync for SegQueue<T> {} |
148 | | |
149 | | impl<T> SegQueue<T> { |
150 | | /// Creates a new unbounded queue. |
151 | | /// |
152 | | /// # Examples |
153 | | /// |
154 | | /// ``` |
155 | | /// use crossbeam_queue::SegQueue; |
156 | | /// |
157 | | /// let q = SegQueue::<i32>::new(); |
158 | | /// ``` |
159 | 105 | pub const fn new() -> SegQueue<T> { |
160 | 105 | SegQueue { |
161 | 105 | head: CachePadded::new(Position { |
162 | 105 | block: AtomicPtr::new(ptr::null_mut()), |
163 | 105 | index: AtomicUsize::new(0), |
164 | 105 | }), |
165 | 105 | tail: CachePadded::new(Position { |
166 | 105 | block: AtomicPtr::new(ptr::null_mut()), |
167 | 105 | index: AtomicUsize::new(0), |
168 | 105 | }), |
169 | 105 | _marker: PhantomData, |
170 | 105 | } |
171 | 105 | } <crossbeam_queue::seg_queue::SegQueue<i32>>::new Line | Count | Source | 159 | 1 | pub const fn new() -> SegQueue<T> { | 160 | 1 | SegQueue { | 161 | 1 | head: CachePadded::new(Position { | 162 | 1 | block: AtomicPtr::new(ptr::null_mut()), | 163 | 1 | index: AtomicUsize::new(0), | 164 | 1 | }), | 165 | 1 | tail: CachePadded::new(Position { | 166 | 1 | block: AtomicPtr::new(ptr::null_mut()), | 167 | 1 | index: AtomicUsize::new(0), | 168 | 1 | }), | 169 | 1 | _marker: PhantomData, | 170 | 1 | } | 171 | 1 | } |
<crossbeam_queue::seg_queue::SegQueue<seg_queue::drops::DropCounter>>::new Line | Count | Source | 159 | 100 | pub const fn new() -> SegQueue<T> { | 160 | 100 | SegQueue { | 161 | 100 | head: CachePadded::new(Position { | 162 | 100 | block: AtomicPtr::new(ptr::null_mut()), | 163 | 100 | index: AtomicUsize::new(0), | 164 | 100 | }), | 165 | 100 | tail: CachePadded::new(Position { | 166 | 100 | block: AtomicPtr::new(ptr::null_mut()), | 167 | 100 | index: AtomicUsize::new(0), | 168 | 100 | }), | 169 | 100 | _marker: PhantomData, | 170 | 100 | } | 171 | 100 | } |
<crossbeam_queue::seg_queue::SegQueue<()>>::new Line | Count | Source | 159 | 1 | pub const fn new() -> SegQueue<T> { | 160 | 1 | SegQueue { | 161 | 1 | head: CachePadded::new(Position { | 162 | 1 | block: AtomicPtr::new(ptr::null_mut()), | 163 | 1 | index: AtomicUsize::new(0), | 164 | 1 | }), | 165 | 1 | tail: CachePadded::new(Position { | 166 | 1 | block: AtomicPtr::new(ptr::null_mut()), | 167 | 1 | index: AtomicUsize::new(0), | 168 | 1 | }), | 169 | 1 | _marker: PhantomData, | 170 | 1 | } | 171 | 1 | } |
<crossbeam_queue::seg_queue::SegQueue<usize>>::new Line | Count | Source | 159 | 3 | pub const fn new() -> SegQueue<T> { | 160 | 3 | SegQueue { | 161 | 3 | head: CachePadded::new(Position { | 162 | 3 | block: AtomicPtr::new(ptr::null_mut()), | 163 | 3 | index: AtomicUsize::new(0), | 164 | 3 | }), | 165 | 3 | tail: CachePadded::new(Position { | 166 | 3 | block: AtomicPtr::new(ptr::null_mut()), | 167 | 3 | index: AtomicUsize::new(0), | 168 | 3 | }), | 169 | 3 | _marker: PhantomData, | 170 | 3 | } | 171 | 3 | } |
|
172 | | |
173 | | /// Pushes an element into the queue. |
174 | | /// |
175 | | /// # Examples |
176 | | /// |
177 | | /// ``` |
178 | | /// use crossbeam_queue::SegQueue; |
179 | | /// |
180 | | /// let q = SegQueue::new(); |
181 | | /// |
182 | | /// q.push(10); |
183 | | /// q.push(20); |
184 | | /// ``` |
185 | 718k | pub fn push(&self, value: T) { |
186 | 718k | let backoff = Backoff::new(); |
187 | 718k | let mut tail = self.tail.index.load(Ordering::Acquire); |
188 | 718k | let mut block = self.tail.block.load(Ordering::Acquire); |
189 | 718k | let mut next_block = None; |
190 | | |
191 | 724k | loop { |
192 | 724k | // Calculate the offset of the index into the block. |
193 | 724k | let offset = (tail >> SHIFT) % LAP; |
194 | 724k | |
195 | 724k | // If we reached the end of the block, wait until the next one is installed. |
196 | 724k | if offset == BLOCK_CAP { |
197 | 118 | backoff.snooze(); |
198 | 118 | tail = self.tail.index.load(Ordering::Acquire); |
199 | 118 | block = self.tail.block.load(Ordering::Acquire); |
200 | | continue; |
201 | 724k | } |
202 | | |
203 | | // If we're going to have to install the next block, allocate it in advance in order to |
204 | | // make the wait for other threads as short as possible. |
205 | 724k | if offset + 1 == BLOCK_CAP && next_block.is_none()23.8k { |
206 | 23.4k | next_block = Some(Box::new(Block::<T>::new())); |
207 | 700k | } |
208 | | |
209 | | // If this is the first push operation, we need to allocate the first block. |
210 | 723k | if block.is_null() { |
211 | 105 | let new = Box::into_raw(Box::new(Block::<T>::new())); |
212 | 105 | |
213 | 105 | if self |
214 | 105 | .tail |
215 | 105 | .block |
216 | 105 | .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) |
217 | 105 | .is_ok() |
218 | 105 | { |
219 | 105 | self.head.block.store(new, Ordering::Release); |
220 | 105 | block = new; |
221 | 105 | } else { |
222 | 0 | next_block = unsafe { Some(Box::from_raw(new)) }; |
223 | 0 | tail = self.tail.index.load(Ordering::Acquire); |
224 | 0 | block = self.tail.block.load(Ordering::Acquire); |
225 | | continue; |
226 | | } |
227 | 723k | } |
228 | | |
229 | 723k | let new_tail = tail + (1 << SHIFT); |
230 | 723k | |
231 | 723k | // Try advancing the tail forward. |
232 | 723k | match self.tail.index.compare_exchange_weak( |
233 | 723k | tail, |
234 | 723k | new_tail, |
235 | 723k | Ordering::SeqCst, |
236 | 723k | Ordering::Acquire, |
237 | 723k | ) { |
238 | 723k | Ok(_) => unsafe { |
239 | | // If we've reached the end of the block, install the next one. |
240 | 716k | if offset + 1 == BLOCK_CAP { |
241 | 23.3k | let next_block = Box::into_raw(next_block.unwrap()); |
242 | 23.3k | let next_index = new_tail.wrapping_add(1 << SHIFT); |
243 | 23.3k | |
244 | 23.3k | self.tail.block.store(next_block, Ordering::Release); |
245 | 23.3k | self.tail.index.store(next_index, Ordering::Release); |
246 | 23.3k | (*block).next.store(next_block, Ordering::Release); |
247 | 693k | } |
248 | | |
249 | | // Write the value into the slot. |
250 | 716k | let slot = (*block).slots.get_unchecked(offset); |
251 | 716k | slot.value.get().write(MaybeUninit::new(value)); |
252 | 716k | slot.state.fetch_or(WRITE, Ordering::Release); |
253 | 716k | |
254 | 716k | return; |
255 | | }, |
256 | 6.72k | Err(t) => { |
257 | 6.72k | tail = t; |
258 | 6.72k | block = self.tail.block.load(Ordering::Acquire); |
259 | 6.72k | backoff.spin(); |
260 | 6.72k | } |
261 | | } |
262 | | } |
263 | 716k | } <crossbeam_queue::seg_queue::SegQueue<usize>>::push Line | Count | Source | 185 | 191k | pub fn push(&self, value: T) { | 186 | 191k | let backoff = Backoff::new(); | 187 | 191k | let mut tail = self.tail.index.load(Ordering::Acquire); | 188 | 191k | let mut block = self.tail.block.load(Ordering::Acquire); | 189 | 191k | let mut next_block = None; | 190 | | | 191 | 198k | loop { | 192 | 198k | // Calculate the offset of the index into the block. | 193 | 198k | let offset = (tail >> SHIFT) % LAP; | 194 | 198k | | 195 | 198k | // If we reached the end of the block, wait until the next one is installed. | 196 | 198k | if offset == BLOCK_CAP { | 197 | 118 | backoff.snooze(); | 198 | 118 | tail = self.tail.index.load(Ordering::Acquire); | 199 | 118 | block = self.tail.block.load(Ordering::Acquire); | 200 | | continue; | 201 | 198k | } | 202 | | | 203 | | // If we're going to have to install the next block, allocate it in advance in order to | 204 | | // make the wait for other threads as short as possible. | 205 | 198k | if offset + 1 == BLOCK_CAP && next_block.is_none()6.89k { | 206 | 6.50k | next_block = Some(Box::new(Block::<T>::new())); | 207 | 190k | } | 208 | | | 209 | | // If this is the first push operation, we need to allocate the first block. | 210 | 197k | if block.is_null() { | 211 | 3 | let new = Box::into_raw(Box::new(Block::<T>::new())); | 212 | 3 | | 213 | 3 | if self | 214 | 3 | .tail | 215 | 3 | .block | 216 | 3 | .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) | 217 | 3 | .is_ok() | 218 | 3 | { | 219 | 3 | self.head.block.store(new, Ordering::Release); | 220 | 3 | block = new; | 221 | 3 | } else { | 222 | 0 | next_block = unsafe { Some(Box::from_raw(new)) }; | 223 | 0 | tail = self.tail.index.load(Ordering::Acquire); | 224 | 0 | block = self.tail.block.load(Ordering::Acquire); | 225 | | continue; | 226 | | } | 227 | 197k | } | 228 | | | 229 | 197k | let new_tail = tail + (1 << SHIFT); | 230 | 197k | | 231 | 197k | // Try advancing the tail forward. | 232 | 197k | match self.tail.index.compare_exchange_weak( | 233 | 197k | tail, | 234 | 197k | new_tail, | 235 | 197k | Ordering::SeqCst, | 236 | 197k | Ordering::Acquire, | 237 | 197k | ) { | 238 | 197k | Ok(_) => unsafe { | 239 | | // If we've reached the end of the block, install the next one. | 240 | 190k | if offset + 1 == BLOCK_CAP { | 241 | 6.44k | let next_block = Box::into_raw(next_block.unwrap()); | 242 | 6.44k | let next_index = new_tail.wrapping_add(1 << SHIFT); | 243 | 6.44k | | 244 | 6.44k | self.tail.block.store(next_block, Ordering::Release); | 245 | 6.44k | self.tail.index.store(next_index, Ordering::Release); | 246 | 6.44k | (*block).next.store(next_block, Ordering::Release); | 247 | 183k | } | 248 | | | 249 | | // Write the value into the slot. | 250 | 190k | let slot = (*block).slots.get_unchecked(offset); | 251 | 190k | slot.value.get().write(MaybeUninit::new(value)); | 252 | 190k | slot.state.fetch_or(WRITE, Ordering::Release); | 253 | 190k | | 254 | 190k | return; | 255 | | }, | 256 | 6.72k | Err(t) => { | 257 | 6.72k | tail = t; | 258 | 6.72k | block = self.tail.block.load(Ordering::Acquire); | 259 | 6.72k | backoff.spin(); | 260 | 6.72k | } | 261 | | } | 262 | | } | 263 | 190k | } |
<crossbeam_queue::seg_queue::SegQueue<i32>>::push Line | Count | Source | 185 | 2 | pub fn push(&self, value: T) { | 186 | 2 | let backoff = Backoff::new(); | 187 | 2 | let mut tail = self.tail.index.load(Ordering::Acquire); | 188 | 2 | let mut block = self.tail.block.load(Ordering::Acquire); | 189 | 2 | let mut next_block = None; | 190 | | | 191 | 2 | loop { | 192 | 2 | // Calculate the offset of the index into the block. | 193 | 2 | let offset = (tail >> SHIFT) % LAP; | 194 | 2 | | 195 | 2 | // If we reached the end of the block, wait until the next one is installed. | 196 | 2 | if offset == BLOCK_CAP { | 197 | 0 | backoff.snooze(); | 198 | 0 | tail = self.tail.index.load(Ordering::Acquire); | 199 | 0 | block = self.tail.block.load(Ordering::Acquire); | 200 | | continue; | 201 | 2 | } | 202 | | | 203 | | // If we're going to have to install the next block, allocate it in advance in order to | 204 | | // make the wait for other threads as short as possible. | 205 | 2 | if offset + 1 == BLOCK_CAP && next_block.is_none()0 { | 206 | 0 | next_block = Some(Box::new(Block::<T>::new())); | 207 | 2 | } | 208 | | | 209 | | // If this is the first push operation, we need to allocate the first block. | 210 | 2 | if block.is_null() { | 211 | 1 | let new = Box::into_raw(Box::new(Block::<T>::new())); | 212 | 1 | | 213 | 1 | if self | 214 | 1 | .tail | 215 | 1 | .block | 216 | 1 | .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) | 217 | 1 | .is_ok() | 218 | 1 | { | 219 | 1 | self.head.block.store(new, Ordering::Release); | 220 | 1 | block = new; | 221 | 1 | } else { | 222 | 0 | next_block = unsafe { Some(Box::from_raw(new)) }; | 223 | 0 | tail = self.tail.index.load(Ordering::Acquire); | 224 | 0 | block = self.tail.block.load(Ordering::Acquire); | 225 | | continue; | 226 | | } | 227 | 1 | } | 228 | | | 229 | 2 | let new_tail = tail + (1 << SHIFT); | 230 | 2 | | 231 | 2 | // Try advancing the tail forward. | 232 | 2 | match self.tail.index.compare_exchange_weak( | 233 | 2 | tail, | 234 | 2 | new_tail, | 235 | 2 | Ordering::SeqCst, | 236 | 2 | Ordering::Acquire, | 237 | 2 | ) { | 238 | 2 | Ok(_) => unsafe { | 239 | | // If we've reached the end of the block, install the next one. | 240 | 2 | if offset + 1 == BLOCK_CAP { | 241 | 0 | let next_block = Box::into_raw(next_block.unwrap()); | 242 | 0 | let next_index = new_tail.wrapping_add(1 << SHIFT); | 243 | 0 |
| 244 | 0 | self.tail.block.store(next_block, Ordering::Release); | 245 | 0 | self.tail.index.store(next_index, Ordering::Release); | 246 | 0 | (*block).next.store(next_block, Ordering::Release); | 247 | 2 | } | 248 | | | 249 | | // Write the value into the slot. | 250 | 2 | let slot = (*block).slots.get_unchecked(offset); | 251 | 2 | slot.value.get().write(MaybeUninit::new(value)); | 252 | 2 | slot.state.fetch_or(WRITE, Ordering::Release); | 253 | 2 | | 254 | 2 | return; | 255 | | }, | 256 | 0 | Err(t) => { | 257 | 0 | tail = t; | 258 | 0 | block = self.tail.block.load(Ordering::Acquire); | 259 | 0 | backoff.spin(); | 260 | 0 | } | 261 | | } | 262 | | } | 263 | 2 | } |
<crossbeam_queue::seg_queue::SegQueue<()>>::push Line | Count | Source | 185 | 1 | pub fn push(&self, value: T) { | 186 | 1 | let backoff = Backoff::new(); | 187 | 1 | let mut tail = self.tail.index.load(Ordering::Acquire); | 188 | 1 | let mut block = self.tail.block.load(Ordering::Acquire); | 189 | 1 | let mut next_block = None; | 190 | | | 191 | 1 | loop { | 192 | 1 | // Calculate the offset of the index into the block. | 193 | 1 | let offset = (tail >> SHIFT) % LAP; | 194 | 1 | | 195 | 1 | // If we reached the end of the block, wait until the next one is installed. | 196 | 1 | if offset == BLOCK_CAP { | 197 | 0 | backoff.snooze(); | 198 | 0 | tail = self.tail.index.load(Ordering::Acquire); | 199 | 0 | block = self.tail.block.load(Ordering::Acquire); | 200 | | continue; | 201 | 1 | } | 202 | | | 203 | | // If we're going to have to install the next block, allocate it in advance in order to | 204 | | // make the wait for other threads as short as possible. | 205 | 1 | if offset + 1 == BLOCK_CAP && next_block.is_none()0 { | 206 | 0 | next_block = Some(Box::new(Block::<T>::new())); | 207 | 1 | } | 208 | | | 209 | | // If this is the first push operation, we need to allocate the first block. | 210 | 1 | if block.is_null() { | 211 | 1 | let new = Box::into_raw(Box::new(Block::<T>::new())); | 212 | 1 | | 213 | 1 | if self | 214 | 1 | .tail | 215 | 1 | .block | 216 | 1 | .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) | 217 | 1 | .is_ok() | 218 | 1 | { | 219 | 1 | self.head.block.store(new, Ordering::Release); | 220 | 1 | block = new; | 221 | 1 | } else { | 222 | 0 | next_block = unsafe { Some(Box::from_raw(new)) }; | 223 | 0 | tail = self.tail.index.load(Ordering::Acquire); | 224 | 0 | block = self.tail.block.load(Ordering::Acquire); | 225 | | continue; | 226 | | } | 227 | 0 | } | 228 | | | 229 | 1 | let new_tail = tail + (1 << SHIFT); | 230 | 1 | | 231 | 1 | // Try advancing the tail forward. | 232 | 1 | match self.tail.index.compare_exchange_weak( | 233 | 1 | tail, | 234 | 1 | new_tail, | 235 | 1 | Ordering::SeqCst, | 236 | 1 | Ordering::Acquire, | 237 | 1 | ) { | 238 | 1 | Ok(_) => unsafe { | 239 | | // If we've reached the end of the block, install the next one. | 240 | 1 | if offset + 1 == BLOCK_CAP { | 241 | 0 | let next_block = Box::into_raw(next_block.unwrap()); | 242 | 0 | let next_index = new_tail.wrapping_add(1 << SHIFT); | 243 | 0 |
| 244 | 0 | self.tail.block.store(next_block, Ordering::Release); | 245 | 0 | self.tail.index.store(next_index, Ordering::Release); | 246 | 0 | (*block).next.store(next_block, Ordering::Release); | 247 | 1 | } | 248 | | | 249 | | // Write the value into the slot. | 250 | 1 | let slot = (*block).slots.get_unchecked(offset); | 251 | 1 | slot.value.get().write(MaybeUninit::new(value)); | 252 | 1 | slot.state.fetch_or(WRITE, Ordering::Release); | 253 | 1 | | 254 | 1 | return; | 255 | | }, | 256 | 0 | Err(t) => { | 257 | 0 | tail = t; | 258 | 0 | block = self.tail.block.load(Ordering::Acquire); | 259 | 0 | backoff.spin(); | 260 | 0 | } | 261 | | } | 262 | | } | 263 | 1 | } |
<crossbeam_queue::seg_queue::SegQueue<seg_queue::drops::DropCounter>>::push Line | Count | Source | 185 | 526k | pub fn push(&self, value: T) { | 186 | 526k | let backoff = Backoff::new(); | 187 | 526k | let mut tail = self.tail.index.load(Ordering::Acquire); | 188 | 526k | let mut block = self.tail.block.load(Ordering::Acquire); | 189 | 526k | let mut next_block = None; | 190 | | | 191 | 526k | loop { | 192 | 526k | // Calculate the offset of the index into the block. | 193 | 526k | let offset = (tail >> SHIFT) % LAP; | 194 | 526k | | 195 | 526k | // If we reached the end of the block, wait until the next one is installed. | 196 | 526k | if offset == BLOCK_CAP { | 197 | 0 | backoff.snooze(); | 198 | 0 | tail = self.tail.index.load(Ordering::Acquire); | 199 | 0 | block = self.tail.block.load(Ordering::Acquire); | 200 | | continue; | 201 | 526k | } | 202 | | | 203 | | // If we're going to have to install the next block, allocate it in advance in order to | 204 | | // make the wait for other threads as short as possible. | 205 | 526k | if offset + 1 == BLOCK_CAP && next_block.is_none()16.9k { | 206 | 16.9k | next_block = Some(Box::new(Block::<T>::new())); | 207 | 509k | } | 208 | | | 209 | | // If this is the first push operation, we need to allocate the first block. | 210 | 526k | if block.is_null() { | 211 | 100 | let new = Box::into_raw(Box::new(Block::<T>::new())); | 212 | 100 | | 213 | 100 | if self | 214 | 100 | .tail | 215 | 100 | .block | 216 | 100 | .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) | 217 | 100 | .is_ok() | 218 | 100 | { | 219 | 100 | self.head.block.store(new, Ordering::Release); | 220 | 100 | block = new; | 221 | 100 | } else { | 222 | 0 | next_block = unsafe { Some(Box::from_raw(new)) }; | 223 | 0 | tail = self.tail.index.load(Ordering::Acquire); | 224 | 0 | block = self.tail.block.load(Ordering::Acquire); | 225 | | continue; | 226 | | } | 227 | 526k | } | 228 | | | 229 | 526k | let new_tail = tail + (1 << SHIFT); | 230 | 526k | | 231 | 526k | // Try advancing the tail forward. | 232 | 526k | match self.tail.index.compare_exchange_weak( | 233 | 526k | tail, | 234 | 526k | new_tail, | 235 | 526k | Ordering::SeqCst, | 236 | 526k | Ordering::Acquire, | 237 | 526k | ) { | 238 | 526k | Ok(_) => unsafe { | 239 | | // If we've reached the end of the block, install the next one. | 240 | 526k | if offset + 1 == BLOCK_CAP { | 241 | 16.9k | let next_block = Box::into_raw(next_block.unwrap()); | 242 | 16.9k | let next_index = new_tail.wrapping_add(1 << SHIFT); | 243 | 16.9k | | 244 | 16.9k | self.tail.block.store(next_block, Ordering::Release); | 245 | 16.9k | self.tail.index.store(next_index, Ordering::Release); | 246 | 16.9k | (*block).next.store(next_block, Ordering::Release); | 247 | 509k | } | 248 | | | 249 | | // Write the value into the slot. | 250 | 526k | let slot = (*block).slots.get_unchecked(offset); | 251 | 526k | slot.value.get().write(MaybeUninit::new(value)); | 252 | 526k | slot.state.fetch_or(WRITE, Ordering::Release); | 253 | 526k | | 254 | 526k | return; | 255 | | }, | 256 | 0 | Err(t) => { | 257 | 0 | tail = t; | 258 | 0 | block = self.tail.block.load(Ordering::Acquire); | 259 | 0 | backoff.spin(); | 260 | 0 | } | 261 | | } | 262 | | } | 263 | 526k | } |
|
264 | | |
265 | | /// Pops an element from the queue. |
266 | | /// |
267 | | /// If the queue is empty, `None` is returned. |
268 | | /// |
269 | | /// # Examples |
270 | | /// |
271 | | /// ``` |
272 | | /// use crossbeam_queue::SegQueue; |
273 | | /// |
274 | | /// let q = SegQueue::new(); |
275 | | /// |
276 | | /// q.push(10); |
277 | | /// assert_eq!(q.pop(), Some(10)); |
278 | | /// assert!(q.pop().is_none()); |
279 | | /// ``` |
280 | 731k | pub fn pop(&self) -> Option<T> { |
281 | 731k | let backoff = Backoff::new(); |
282 | 731k | let mut head = self.head.index.load(Ordering::Acquire); |
283 | 731k | let mut block = self.head.block.load(Ordering::Acquire); |
284 | | |
285 | 742k | loop { |
286 | 742k | // Calculate the offset of the index into the block. |
287 | 742k | let offset = (head >> SHIFT) % LAP; |
288 | 742k | |
289 | 742k | // If we reached the end of the block, wait until the next one is installed. |
290 | 742k | if offset == BLOCK_CAP { |
291 | 169 | backoff.snooze(); |
292 | 169 | head = self.head.index.load(Ordering::Acquire); |
293 | 169 | block = self.head.block.load(Ordering::Acquire); |
294 | | continue; |
295 | 741k | } |
296 | 741k | |
297 | 741k | let mut new_head = head + (1 << SHIFT); |
298 | 741k | |
299 | 741k | if new_head & HAS_NEXT == 0 { |
300 | 68.8k | atomic::fence(Ordering::SeqCst); |
301 | 68.8k | let tail = self.tail.index.load(Ordering::Relaxed); |
302 | 68.8k | |
303 | 68.8k | // If the tail equals the head, that means the queue is empty. |
304 | 68.8k | if head >> SHIFT == tail >> SHIFT { |
305 | 64.3k | return None; |
306 | 4.50k | } |
307 | 4.50k | |
308 | 4.50k | // If head and tail are not in the same block, set `HAS_NEXT` in head. |
309 | 4.50k | if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { |
310 | 211 | new_head |= HAS_NEXT; |
311 | 3.62k | } |
312 | 673k | } |
313 | | |
314 | | // The block can be null here only if the first push operation is in progress. In that |
315 | | // case, just wait until it gets initialized. |
316 | 676k | if block.is_null() { |
317 | 21 | backoff.snooze(); |
318 | 21 | head = self.head.index.load(Ordering::Acquire); |
319 | 21 | block = self.head.block.load(Ordering::Acquire); |
320 | | continue; |
321 | 676k | } |
322 | 676k | |
323 | 676k | // Try moving the head index forward. |
324 | 676k | match self.head.index.compare_exchange_weak( |
325 | 676k | head, |
326 | 676k | new_head, |
327 | 676k | Ordering::SeqCst, |
328 | 676k | Ordering::Acquire, |
329 | 676k | ) { |
330 | 676k | Ok(_) => unsafe { |
331 | | // If we've reached the end of the block, move to the next one. |
332 | 666k | if offset + 1 == BLOCK_CAP { |
333 | 21.7k | let next = (*block).wait_next(); |
334 | 21.7k | let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); |
335 | 21.7k | if !(*next).next.load(Ordering::Relaxed).is_null() { |
336 | 21.5k | next_index |= HAS_NEXT; |
337 | 21.5k | }207 |
338 | | |
339 | 21.7k | self.head.block.store(next, Ordering::Release); |
340 | 21.7k | self.head.index.store(next_index, Ordering::Release); |
341 | 644k | } |
342 | | |
343 | | // Read the value. |
344 | 666k | let slot = (*block).slots.get_unchecked(offset); |
345 | 666k | slot.wait_write(); |
346 | 666k | let value = slot.value.get().read().assume_init(); |
347 | 666k | |
348 | 666k | // Destroy the block if we've reached the end, or if another thread wanted to |
349 | 666k | // destroy but couldn't because we were busy reading from the slot. |
350 | 666k | if offset + 1 == BLOCK_CAP { |
351 | 21.7k | Block::destroy(block, 0); |
352 | 644k | } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { |
353 | 1 | Block::destroy(block, offset + 1); |
354 | 651k | } |
355 | | |
356 | 673k | return Some(value); |
357 | | }, |
358 | 10.5k | Err(h) => { |
359 | 10.5k | head = h; |
360 | 10.5k | block = self.head.block.load(Ordering::Acquire); |
361 | 10.5k | backoff.spin(); |
362 | 10.5k | } |
363 | | } |
364 | | } |
365 | 737k | } <crossbeam_queue::seg_queue::SegQueue<usize>>::pop Line | Count | Source | 280 | 209k | pub fn pop(&self) -> Option<T> { | 281 | 209k | let backoff = Backoff::new(); | 282 | 209k | let mut head = self.head.index.load(Ordering::Acquire); | 283 | 209k | let mut block = self.head.block.load(Ordering::Acquire); | 284 | | | 285 | 220k | loop { | 286 | 220k | // Calculate the offset of the index into the block. | 287 | 220k | let offset = (head >> SHIFT) % LAP; | 288 | 220k | | 289 | 220k | // If we reached the end of the block, wait until the next one is installed. | 290 | 220k | if offset == BLOCK_CAP { | 291 | 169 | backoff.snooze(); | 292 | 169 | head = self.head.index.load(Ordering::Acquire); | 293 | 169 | block = self.head.block.load(Ordering::Acquire); | 294 | | continue; | 295 | 220k | } | 296 | 220k | | 297 | 220k | let mut new_head = head + (1 << SHIFT); | 298 | 220k | | 299 | 220k | if new_head & HAS_NEXT == 0 { | 300 | 18.9k | atomic::fence(Ordering::SeqCst); | 301 | 18.9k | let tail = self.tail.index.load(Ordering::Relaxed); | 302 | 18.9k | | 303 | 18.9k | // If the tail equals the head, that means the queue is empty. | 304 | 18.9k | if head >> SHIFT == tail >> SHIFT { | 305 | 18.1k | return None; | 306 | 790 | } | 307 | 790 | | 308 | 790 | // If head and tail are not in the same block, set `HAS_NEXT` in head. | 309 | 790 | if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { | 310 | 10 | new_head |= HAS_NEXT; | 311 | 117 | } | 312 | 201k | } | 313 | | | 314 | | // The block can be null here only if the first push operation is in progress. In that | 315 | | // case, just wait until it gets initialized. | 316 | 201k | if block.is_null() { | 317 | 0 | backoff.snooze(); | 318 | 0 | head = self.head.index.load(Ordering::Acquire); | 319 | 0 | block = self.head.block.load(Ordering::Acquire); | 320 | | continue; | 321 | 201k | } | 322 | 201k | | 323 | 201k | // Try moving the head index forward. | 324 | 201k | match self.head.index.compare_exchange_weak( | 325 | 201k | head, | 326 | 201k | new_head, | 327 | 201k | Ordering::SeqCst, | 328 | 201k | Ordering::Acquire, | 329 | 201k | ) { | 330 | 201k | Ok(_) => unsafe { | 331 | | // If we've reached the end of the block, move to the next one. | 332 | 190k | if offset + 1 == BLOCK_CAP { | 333 | 6.44k | let next = (*block).wait_next(); | 334 | 6.44k | let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); | 335 | 6.44k | if !(*next).next.load(Ordering::Relaxed).is_null() { | 336 | 6.44k | next_index |= HAS_NEXT; | 337 | 6.44k | }5 | 338 | | | 339 | 6.44k | self.head.block.store(next, Ordering::Release); | 340 | 6.44k | self.head.index.store(next_index, Ordering::Release); | 341 | 184k | } | 342 | | | 343 | | // Read the value. | 344 | 190k | let slot = (*block).slots.get_unchecked(offset); | 345 | 190k | slot.wait_write(); | 346 | 190k | let value = slot.value.get().read().assume_init(); | 347 | 190k | | 348 | 190k | // Destroy the block if we've reached the end, or if another thread wanted to | 349 | 190k | // destroy but couldn't because we were busy reading from the slot. | 350 | 190k | if offset + 1 == BLOCK_CAP { | 351 | 6.44k | Block::destroy(block, 0); | 352 | 184k | } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { | 353 | 1 | Block::destroy(block, offset + 1); | 354 | 191k | } | 355 | | | 356 | 197k | return Some(value); | 357 | | }, | 358 | 10.5k | Err(h) => { | 359 | 10.5k | head = h; | 360 | 10.5k | block = self.head.block.load(Ordering::Acquire); | 361 | 10.5k | backoff.spin(); | 362 | 10.5k | } | 363 | | } | 364 | | } | 365 | 216k | } |
<crossbeam_queue::seg_queue::SegQueue<i32>>::pop Line | Count | Source | 280 | 3 | pub fn pop(&self) -> Option<T> { | 281 | 3 | let backoff = Backoff::new(); | 282 | 3 | let mut head = self.head.index.load(Ordering::Acquire); | 283 | 3 | let mut block = self.head.block.load(Ordering::Acquire); | 284 | | | 285 | 3 | loop { | 286 | 3 | // Calculate the offset of the index into the block. | 287 | 3 | let offset = (head >> SHIFT) % LAP; | 288 | 3 | | 289 | 3 | // If we reached the end of the block, wait until the next one is installed. | 290 | 3 | if offset == BLOCK_CAP { | 291 | 0 | backoff.snooze(); | 292 | 0 | head = self.head.index.load(Ordering::Acquire); | 293 | 0 | block = self.head.block.load(Ordering::Acquire); | 294 | | continue; | 295 | 3 | } | 296 | 3 | | 297 | 3 | let mut new_head = head + (1 << SHIFT); | 298 | 3 | | 299 | 3 | if new_head & HAS_NEXT == 0 { | 300 | 3 | atomic::fence(Ordering::SeqCst); | 301 | 3 | let tail = self.tail.index.load(Ordering::Relaxed); | 302 | 3 | | 303 | 3 | // If the tail equals the head, that means the queue is empty. | 304 | 3 | if head >> SHIFT == tail >> SHIFT { | 305 | 1 | return None; | 306 | 2 | } | 307 | 2 | | 308 | 2 | // If head and tail are not in the same block, set `HAS_NEXT` in head. | 309 | 2 | if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { | 310 | 0 | new_head |= HAS_NEXT; | 311 | 2 | } | 312 | 0 | } | 313 | | | 314 | | // The block can be null here only if the first push operation is in progress. In that | 315 | | // case, just wait until it gets initialized. | 316 | 2 | if block.is_null() { | 317 | 0 | backoff.snooze(); | 318 | 0 | head = self.head.index.load(Ordering::Acquire); | 319 | 0 | block = self.head.block.load(Ordering::Acquire); | 320 | | continue; | 321 | 2 | } | 322 | 2 | | 323 | 2 | // Try moving the head index forward. | 324 | 2 | match self.head.index.compare_exchange_weak( | 325 | 2 | head, | 326 | 2 | new_head, | 327 | 2 | Ordering::SeqCst, | 328 | 2 | Ordering::Acquire, | 329 | 2 | ) { | 330 | 2 | Ok(_) => unsafe { | 331 | | // If we've reached the end of the block, move to the next one. | 332 | 2 | if offset + 1 == BLOCK_CAP { | 333 | 0 | let next = (*block).wait_next(); | 334 | 0 | let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); | 335 | 0 | if !(*next).next.load(Ordering::Relaxed).is_null() { | 336 | 0 | next_index |= HAS_NEXT; | 337 | 0 | } | 338 | | | 339 | 0 | self.head.block.store(next, Ordering::Release); | 340 | 0 | self.head.index.store(next_index, Ordering::Release); | 341 | 2 | } | 342 | | | 343 | | // Read the value. | 344 | 2 | let slot = (*block).slots.get_unchecked(offset); | 345 | 2 | slot.wait_write(); | 346 | 2 | let value = slot.value.get().read().assume_init(); | 347 | 2 | | 348 | 2 | // Destroy the block if we've reached the end, or if another thread wanted to | 349 | 2 | // destroy but couldn't because we were busy reading from the slot. | 350 | 2 | if offset + 1 == BLOCK_CAP { | 351 | 0 | Block::destroy(block, 0); | 352 | 2 | } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { | 353 | 0 | Block::destroy(block, offset + 1); | 354 | 2 | } | 355 | | | 356 | 2 | return Some(value); | 357 | | }, | 358 | 0 | Err(h) => { | 359 | 0 | head = h; | 360 | 0 | block = self.head.block.load(Ordering::Acquire); | 361 | 0 | backoff.spin(); | 362 | 0 | } | 363 | | } | 364 | | } | 365 | 3 | } |
<crossbeam_queue::seg_queue::SegQueue<seg_queue::drops::DropCounter>>::pop Line | Count | Source | 280 | 521k | pub fn pop(&self) -> Option<T> { | 281 | 521k | let backoff = Backoff::new(); | 282 | 521k | let mut head = self.head.index.load(Ordering::Acquire); | 283 | 521k | let mut block = self.head.block.load(Ordering::Acquire); | 284 | | | 285 | 521k | loop { | 286 | 521k | // Calculate the offset of the index into the block. | 287 | 521k | let offset = (head >> SHIFT) % LAP; | 288 | 521k | | 289 | 521k | // If we reached the end of the block, wait until the next one is installed. | 290 | 521k | if offset == BLOCK_CAP { | 291 | 0 | backoff.snooze(); | 292 | 0 | head = self.head.index.load(Ordering::Acquire); | 293 | 0 | block = self.head.block.load(Ordering::Acquire); | 294 | | continue; | 295 | 521k | } | 296 | 521k | | 297 | 521k | let mut new_head = head + (1 << SHIFT); | 298 | 521k | | 299 | 521k | if new_head & HAS_NEXT == 0 { | 300 | 49.9k | atomic::fence(Ordering::SeqCst); | 301 | 49.9k | let tail = self.tail.index.load(Ordering::Relaxed); | 302 | 49.9k | | 303 | 49.9k | // If the tail equals the head, that means the queue is empty. | 304 | 49.9k | if head >> SHIFT == tail >> SHIFT { | 305 | 46.2k | return None; | 306 | 3.70k | } | 307 | 3.70k | | 308 | 3.70k | // If head and tail are not in the same block, set `HAS_NEXT` in head. | 309 | 3.70k | if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { | 310 | 201 | new_head |= HAS_NEXT; | 311 | 3.50k | } | 312 | 471k | } | 313 | | | 314 | | // The block can be null here only if the first push operation is in progress. In that | 315 | | // case, just wait until it gets initialized. | 316 | 475k | if block.is_null() { | 317 | 21 | backoff.snooze(); | 318 | 21 | head = self.head.index.load(Ordering::Acquire); | 319 | 21 | block = self.head.block.load(Ordering::Acquire); | 320 | | continue; | 321 | 475k | } | 322 | 475k | | 323 | 475k | // Try moving the head index forward. | 324 | 475k | match self.head.index.compare_exchange_weak( | 325 | 475k | head, | 326 | 475k | new_head, | 327 | 475k | Ordering::SeqCst, | 328 | 475k | Ordering::Acquire, | 329 | 475k | ) { | 330 | 475k | Ok(_) => unsafe { | 331 | | // If we've reached the end of the block, move to the next one. | 332 | 475k | if offset + 1 == BLOCK_CAP { | 333 | 15.2k | let next = (*block).wait_next(); | 334 | 15.2k | let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); | 335 | 15.2k | if !(*next).next.load(Ordering::Relaxed).is_null() { | 336 | 15.0k | next_index |= HAS_NEXT; | 337 | 15.0k | }202 | 338 | | | 339 | 15.2k | self.head.block.store(next, Ordering::Release); | 340 | 15.2k | self.head.index.store(next_index, Ordering::Release); | 341 | 460k | } | 342 | | | 343 | | // Read the value. | 344 | 475k | let slot = (*block).slots.get_unchecked(offset); | 345 | 475k | slot.wait_write(); | 346 | 475k | let value = slot.value.get().read().assume_init(); | 347 | 475k | | 348 | 475k | // Destroy the block if we've reached the end, or if another thread wanted to | 349 | 475k | // destroy but couldn't because we were busy reading from the slot. | 350 | 475k | if offset + 1 == BLOCK_CAP { | 351 | 15.2k | Block::destroy(block, 0); | 352 | 460k | } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { | 353 | 0 | Block::destroy(block, offset + 1); | 354 | 460k | } | 355 | | | 356 | 475k | return Some(value); | 357 | | }, | 358 | 0 | Err(h) => { | 359 | 0 | head = h; | 360 | 0 | block = self.head.block.load(Ordering::Acquire); | 361 | 0 | backoff.spin(); | 362 | 0 | } | 363 | | } | 364 | | } | 365 | 521k | } |
<crossbeam_queue::seg_queue::SegQueue<()>>::pop Line | Count | Source | 280 | 1 | pub fn pop(&self) -> Option<T> { | 281 | 1 | let backoff = Backoff::new(); | 282 | 1 | let mut head = self.head.index.load(Ordering::Acquire); | 283 | 1 | let mut block = self.head.block.load(Ordering::Acquire); | 284 | | | 285 | 1 | loop { | 286 | 1 | // Calculate the offset of the index into the block. | 287 | 1 | let offset = (head >> SHIFT) % LAP; | 288 | 1 | | 289 | 1 | // If we reached the end of the block, wait until the next one is installed. | 290 | 1 | if offset == BLOCK_CAP { | 291 | 0 | backoff.snooze(); | 292 | 0 | head = self.head.index.load(Ordering::Acquire); | 293 | 0 | block = self.head.block.load(Ordering::Acquire); | 294 | | continue; | 295 | 1 | } | 296 | 1 | | 297 | 1 | let mut new_head = head + (1 << SHIFT); | 298 | 1 | | 299 | 1 | if new_head & HAS_NEXT == 0 { | 300 | 1 | atomic::fence(Ordering::SeqCst); | 301 | 1 | let tail = self.tail.index.load(Ordering::Relaxed); | 302 | 1 | | 303 | 1 | // If the tail equals the head, that means the queue is empty. | 304 | 1 | if head >> SHIFT == tail >> SHIFT { | 305 | 0 | return None; | 306 | 1 | } | 307 | 1 | | 308 | 1 | // If head and tail are not in the same block, set `HAS_NEXT` in head. | 309 | 1 | if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { | 310 | 0 | new_head |= HAS_NEXT; | 311 | 1 | } | 312 | 0 | } | 313 | | | 314 | | // The block can be null here only if the first push operation is in progress. In that | 315 | | // case, just wait until it gets initialized. | 316 | 1 | if block.is_null() { | 317 | 0 | backoff.snooze(); | 318 | 0 | head = self.head.index.load(Ordering::Acquire); | 319 | 0 | block = self.head.block.load(Ordering::Acquire); | 320 | | continue; | 321 | 1 | } | 322 | 1 | | 323 | 1 | // Try moving the head index forward. | 324 | 1 | match self.head.index.compare_exchange_weak( | 325 | 1 | head, | 326 | 1 | new_head, | 327 | 1 | Ordering::SeqCst, | 328 | 1 | Ordering::Acquire, | 329 | 1 | ) { | 330 | 1 | Ok(_) => unsafe { | 331 | | // If we've reached the end of the block, move to the next one. | 332 | 1 | if offset + 1 == BLOCK_CAP { | 333 | 0 | let next = (*block).wait_next(); | 334 | 0 | let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); | 335 | 0 | if !(*next).next.load(Ordering::Relaxed).is_null() { | 336 | 0 | next_index |= HAS_NEXT; | 337 | 0 | } | 338 | | | 339 | 0 | self.head.block.store(next, Ordering::Release); | 340 | 0 | self.head.index.store(next_index, Ordering::Release); | 341 | 1 | } | 342 | | | 343 | | // Read the value. | 344 | 1 | let slot = (*block).slots.get_unchecked(offset); | 345 | 1 | slot.wait_write(); | 346 | 1 | let value = slot.value.get().read().assume_init(); | 347 | 1 | | 348 | 1 | // Destroy the block if we've reached the end, or if another thread wanted to | 349 | 1 | // destroy but couldn't because we were busy reading from the slot. | 350 | 1 | if offset + 1 == BLOCK_CAP { | 351 | 0 | Block::destroy(block, 0); | 352 | 1 | } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { | 353 | 0 | Block::destroy(block, offset + 1); | 354 | 1 | } | 355 | | | 356 | 1 | return Some(value); | 357 | | }, | 358 | 0 | Err(h) => { | 359 | 0 | head = h; | 360 | 0 | block = self.head.block.load(Ordering::Acquire); | 361 | 0 | backoff.spin(); | 362 | 0 | } | 363 | | } | 364 | | } | 365 | 1 | } |
|
366 | | |
367 | | /// Returns `true` if the queue is empty. |
368 | | /// |
369 | | /// # Examples |
370 | | /// |
371 | | /// ``` |
372 | | /// use crossbeam_queue::SegQueue; |
373 | | /// |
374 | | /// let q = SegQueue::new(); |
375 | | /// |
376 | | /// assert!(q.is_empty()); |
377 | | /// q.push(1); |
378 | | /// assert!(!q.is_empty()); |
379 | | /// ``` |
380 | 3 | pub fn is_empty(&self) -> bool { |
381 | 3 | let head = self.head.index.load(Ordering::SeqCst); |
382 | 3 | let tail = self.tail.index.load(Ordering::SeqCst); |
383 | 3 | head >> SHIFT == tail >> SHIFT |
384 | 3 | } |
385 | | |
386 | | /// Returns the number of elements in the queue. |
387 | | /// |
388 | | /// # Examples |
389 | | /// |
390 | | /// ``` |
391 | | /// use crossbeam_queue::SegQueue; |
392 | | /// |
393 | | /// let q = SegQueue::new(); |
394 | | /// assert_eq!(q.len(), 0); |
395 | | /// |
396 | | /// q.push(10); |
397 | | /// assert_eq!(q.len(), 1); |
398 | | /// |
399 | | /// q.push(20); |
400 | | /// assert_eq!(q.len(), 2); |
401 | | /// ``` |
402 | 105 | pub fn len(&self) -> usize { |
403 | 105 | loop { |
404 | 105 | // Load the tail index, then load the head index. |
405 | 105 | let mut tail = self.tail.index.load(Ordering::SeqCst); |
406 | 105 | let mut head = self.head.index.load(Ordering::SeqCst); |
407 | 105 | |
408 | 105 | // If the tail index didn't change, we've got consistent indices to work with. |
409 | 105 | if self.tail.index.load(Ordering::SeqCst) == tail { |
410 | | // Erase the lower bits. |
411 | 105 | tail &= !((1 << SHIFT) - 1); |
412 | 105 | head &= !((1 << SHIFT) - 1); |
413 | 105 | |
414 | 105 | // Fix up indices if they fall onto block ends. |
415 | 105 | if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { |
416 | 0 | tail = tail.wrapping_add(1 << SHIFT); |
417 | 105 | } |
418 | 105 | if (head >> SHIFT) & (LAP - 1) == LAP - 1 { |
419 | 0 | head = head.wrapping_add(1 << SHIFT); |
420 | 105 | } |
421 | | |
422 | | // Rotate indices so that head falls into the first block. |
423 | 105 | let lap = (head >> SHIFT) / LAP; |
424 | 105 | tail = tail.wrapping_sub((lap * LAP) << SHIFT); |
425 | 105 | head = head.wrapping_sub((lap * LAP) << SHIFT); |
426 | 105 | |
427 | 105 | // Remove the lower bits. |
428 | 105 | tail >>= SHIFT; |
429 | 105 | head >>= SHIFT; |
430 | 105 | |
431 | 105 | // Return the difference minus the number of blocks between tail and head. |
432 | 105 | return tail - head - tail / LAP; |
433 | 0 | } |
434 | | } |
435 | 105 | } <crossbeam_queue::seg_queue::SegQueue<usize>>::len Line | Count | Source | 402 | 102 | pub fn len(&self) -> usize { | 403 | 102 | loop { | 404 | 102 | // Load the tail index, then load the head index. | 405 | 102 | let mut tail = self.tail.index.load(Ordering::SeqCst); | 406 | 102 | let mut head = self.head.index.load(Ordering::SeqCst); | 407 | 102 | | 408 | 102 | // If the tail index didn't change, we've got consistent indices to work with. | 409 | 102 | if self.tail.index.load(Ordering::SeqCst) == tail { | 410 | | // Erase the lower bits. | 411 | 102 | tail &= !((1 << SHIFT) - 1); | 412 | 102 | head &= !((1 << SHIFT) - 1); | 413 | 102 | | 414 | 102 | // Fix up indices if they fall onto block ends. | 415 | 102 | if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { | 416 | 0 | tail = tail.wrapping_add(1 << SHIFT); | 417 | 102 | } | 418 | 102 | if (head >> SHIFT) & (LAP - 1) == LAP - 1 { | 419 | 0 | head = head.wrapping_add(1 << SHIFT); | 420 | 102 | } | 421 | | | 422 | | // Rotate indices so that head falls into the first block. | 423 | 102 | let lap = (head >> SHIFT) / LAP; | 424 | 102 | tail = tail.wrapping_sub((lap * LAP) << SHIFT); | 425 | 102 | head = head.wrapping_sub((lap * LAP) << SHIFT); | 426 | 102 | | 427 | 102 | // Remove the lower bits. | 428 | 102 | tail >>= SHIFT; | 429 | 102 | head >>= SHIFT; | 430 | 102 | | 431 | 102 | // Return the difference minus the number of blocks between tail and head. | 432 | 102 | return tail - head - tail / LAP; | 433 | 0 | } | 434 | | } | 435 | 102 | } |
<crossbeam_queue::seg_queue::SegQueue<()>>::len Line | Count | Source | 402 | 3 | pub fn len(&self) -> usize { | 403 | 3 | loop { | 404 | 3 | // Load the tail index, then load the head index. | 405 | 3 | let mut tail = self.tail.index.load(Ordering::SeqCst); | 406 | 3 | let mut head = self.head.index.load(Ordering::SeqCst); | 407 | 3 | | 408 | 3 | // If the tail index didn't change, we've got consistent indices to work with. | 409 | 3 | if self.tail.index.load(Ordering::SeqCst) == tail { | 410 | | // Erase the lower bits. | 411 | 3 | tail &= !((1 << SHIFT) - 1); | 412 | 3 | head &= !((1 << SHIFT) - 1); | 413 | 3 | | 414 | 3 | // Fix up indices if they fall onto block ends. | 415 | 3 | if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { | 416 | 0 | tail = tail.wrapping_add(1 << SHIFT); | 417 | 3 | } | 418 | 3 | if (head >> SHIFT) & (LAP - 1) == LAP - 1 { | 419 | 0 | head = head.wrapping_add(1 << SHIFT); | 420 | 3 | } | 421 | | | 422 | | // Rotate indices so that head falls into the first block. | 423 | 3 | let lap = (head >> SHIFT) / LAP; | 424 | 3 | tail = tail.wrapping_sub((lap * LAP) << SHIFT); | 425 | 3 | head = head.wrapping_sub((lap * LAP) << SHIFT); | 426 | 3 | | 427 | 3 | // Remove the lower bits. | 428 | 3 | tail >>= SHIFT; | 429 | 3 | head >>= SHIFT; | 430 | 3 | | 431 | 3 | // Return the difference minus the number of blocks between tail and head. | 432 | 3 | return tail - head - tail / LAP; | 433 | 0 | } | 434 | | } | 435 | 3 | } |
|
436 | | } |
437 | | |
438 | | impl<T> Drop for SegQueue<T> { |
439 | 105 | fn drop(&mut self) { |
440 | 105 | let mut head = self.head.index.load(Ordering::Relaxed); |
441 | 105 | let mut tail = self.tail.index.load(Ordering::Relaxed); |
442 | 105 | let mut block = self.head.block.load(Ordering::Relaxed); |
443 | 105 | |
444 | 105 | // Erase the lower bits. |
445 | 105 | head &= !((1 << SHIFT) - 1); |
446 | 105 | tail &= !((1 << SHIFT) - 1); |
447 | | |
448 | | unsafe { |
449 | | // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. |
450 | 52.6k | while head != tail { |
451 | 52.5k | let offset = (head >> SHIFT) % LAP; |
452 | 52.5k | |
453 | 52.5k | if offset < BLOCK_CAP { |
454 | 50.8k | // Drop the value in the slot. |
455 | 50.8k | let slot = (*block).slots.get_unchecked(offset); |
456 | 50.8k | let p = &mut *slot.value.get(); |
457 | 50.8k | p.as_mut_ptr().drop_in_place(); |
458 | 50.8k | } else { |
459 | 1.64k | // Deallocate the block and move to the next one. |
460 | 1.64k | let next = (*block).next.load(Ordering::Relaxed); |
461 | 1.64k | drop(Box::from_raw(block)); |
462 | 1.64k | block = next; |
463 | 1.64k | } |
464 | | |
465 | 52.5k | head = head.wrapping_add(1 << SHIFT); |
466 | | } |
467 | | |
468 | | // Deallocate the last remaining block. |
469 | 105 | if !block.is_null() { |
470 | 105 | drop(Box::from_raw(block)); |
471 | 105 | }0 |
472 | | } |
473 | 105 | } <crossbeam_queue::seg_queue::SegQueue<()> as core::ops::drop::Drop>::drop Line | Count | Source | 439 | 1 | fn drop(&mut self) { | 440 | 1 | let mut head = self.head.index.load(Ordering::Relaxed); | 441 | 1 | let mut tail = self.tail.index.load(Ordering::Relaxed); | 442 | 1 | let mut block = self.head.block.load(Ordering::Relaxed); | 443 | 1 | | 444 | 1 | // Erase the lower bits. | 445 | 1 | head &= !((1 << SHIFT) - 1); | 446 | 1 | tail &= !((1 << SHIFT) - 1); | 447 | | | 448 | | unsafe { | 449 | | // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. | 450 | 1 | while head != tail { | 451 | 0 | let offset = (head >> SHIFT) % LAP; | 452 | 0 |
| 453 | 0 | if offset < BLOCK_CAP { | 454 | 0 | // Drop the value in the slot. | 455 | 0 | let slot = (*block).slots.get_unchecked(offset); | 456 | 0 | let p = &mut *slot.value.get(); | 457 | 0 | p.as_mut_ptr().drop_in_place(); | 458 | 0 | } else { | 459 | 0 | // Deallocate the block and move to the next one. | 460 | 0 | let next = (*block).next.load(Ordering::Relaxed); | 461 | 0 | drop(Box::from_raw(block)); | 462 | 0 | block = next; | 463 | 0 | } | 464 | | | 465 | 0 | head = head.wrapping_add(1 << SHIFT); | 466 | | } | 467 | | | 468 | | // Deallocate the last remaining block. | 469 | 1 | if !block.is_null() { | 470 | 1 | drop(Box::from_raw(block)); | 471 | 1 | }0 | 472 | | } | 473 | 1 | } |
<crossbeam_queue::seg_queue::SegQueue<seg_queue::drops::DropCounter> as core::ops::drop::Drop>::drop Line | Count | Source | 439 | 100 | fn drop(&mut self) { | 440 | 100 | let mut head = self.head.index.load(Ordering::Relaxed); | 441 | 100 | let mut tail = self.tail.index.load(Ordering::Relaxed); | 442 | 100 | let mut block = self.head.block.load(Ordering::Relaxed); | 443 | 100 | | 444 | 100 | // Erase the lower bits. | 445 | 100 | head &= !((1 << SHIFT) - 1); | 446 | 100 | tail &= !((1 << SHIFT) - 1); | 447 | | | 448 | | unsafe { | 449 | | // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. | 450 | 52.6k | while head != tail { | 451 | 52.5k | let offset = (head >> SHIFT) % LAP; | 452 | 52.5k | | 453 | 52.5k | if offset < BLOCK_CAP { | 454 | 50.8k | // Drop the value in the slot. | 455 | 50.8k | let slot = (*block).slots.get_unchecked(offset); | 456 | 50.8k | let p = &mut *slot.value.get(); | 457 | 50.8k | p.as_mut_ptr().drop_in_place(); | 458 | 50.8k | } else { | 459 | 1.64k | // Deallocate the block and move to the next one. | 460 | 1.64k | let next = (*block).next.load(Ordering::Relaxed); | 461 | 1.64k | drop(Box::from_raw(block)); | 462 | 1.64k | block = next; | 463 | 1.64k | } | 464 | | | 465 | 52.5k | head = head.wrapping_add(1 << SHIFT); | 466 | | } | 467 | | | 468 | | // Deallocate the last remaining block. | 469 | 100 | if !block.is_null() { | 470 | 100 | drop(Box::from_raw(block)); | 471 | 100 | }0 | 472 | | } | 473 | 100 | } |
<crossbeam_queue::seg_queue::SegQueue<usize> as core::ops::drop::Drop>::drop Line | Count | Source | 439 | 3 | fn drop(&mut self) { | 440 | 3 | let mut head = self.head.index.load(Ordering::Relaxed); | 441 | 3 | let mut tail = self.tail.index.load(Ordering::Relaxed); | 442 | 3 | let mut block = self.head.block.load(Ordering::Relaxed); | 443 | 3 | | 444 | 3 | // Erase the lower bits. | 445 | 3 | head &= !((1 << SHIFT) - 1); | 446 | 3 | tail &= !((1 << SHIFT) - 1); | 447 | | | 448 | | unsafe { | 449 | | // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. | 450 | 3 | while head != tail { | 451 | 0 | let offset = (head >> SHIFT) % LAP; | 452 | 0 |
| 453 | 0 | if offset < BLOCK_CAP { | 454 | 0 | // Drop the value in the slot. | 455 | 0 | let slot = (*block).slots.get_unchecked(offset); | 456 | 0 | let p = &mut *slot.value.get(); | 457 | 0 | p.as_mut_ptr().drop_in_place(); | 458 | 0 | } else { | 459 | 0 | // Deallocate the block and move to the next one. | 460 | 0 | let next = (*block).next.load(Ordering::Relaxed); | 461 | 0 | drop(Box::from_raw(block)); | 462 | 0 | block = next; | 463 | 0 | } | 464 | | | 465 | 0 | head = head.wrapping_add(1 << SHIFT); | 466 | | } | 467 | | | 468 | | // Deallocate the last remaining block. | 469 | 3 | if !block.is_null() { | 470 | 3 | drop(Box::from_raw(block)); | 471 | 3 | }0 | 472 | | } | 473 | 3 | } |
<crossbeam_queue::seg_queue::SegQueue<i32> as core::ops::drop::Drop>::drop Line | Count | Source | 439 | 1 | fn drop(&mut self) { | 440 | 1 | let mut head = self.head.index.load(Ordering::Relaxed); | 441 | 1 | let mut tail = self.tail.index.load(Ordering::Relaxed); | 442 | 1 | let mut block = self.head.block.load(Ordering::Relaxed); | 443 | 1 | | 444 | 1 | // Erase the lower bits. | 445 | 1 | head &= !((1 << SHIFT) - 1); | 446 | 1 | tail &= !((1 << SHIFT) - 1); | 447 | | | 448 | | unsafe { | 449 | | // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. | 450 | 1 | while head != tail { | 451 | 0 | let offset = (head >> SHIFT) % LAP; | 452 | 0 |
| 453 | 0 | if offset < BLOCK_CAP { | 454 | 0 | // Drop the value in the slot. | 455 | 0 | let slot = (*block).slots.get_unchecked(offset); | 456 | 0 | let p = &mut *slot.value.get(); | 457 | 0 | p.as_mut_ptr().drop_in_place(); | 458 | 0 | } else { | 459 | 0 | // Deallocate the block and move to the next one. | 460 | 0 | let next = (*block).next.load(Ordering::Relaxed); | 461 | 0 | drop(Box::from_raw(block)); | 462 | 0 | block = next; | 463 | 0 | } | 464 | | | 465 | 0 | head = head.wrapping_add(1 << SHIFT); | 466 | | } | 467 | | | 468 | | // Deallocate the last remaining block. | 469 | 1 | if !block.is_null() { | 470 | 1 | drop(Box::from_raw(block)); | 471 | 1 | }0 | 472 | | } | 473 | 1 | } |
|
474 | | } |
475 | | |
476 | | impl<T> fmt::Debug for SegQueue<T> { |
477 | | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
478 | | f.pad("SegQueue { .. }") |
479 | | } |
480 | | } |
481 | | |
482 | | impl<T> Default for SegQueue<T> { |
483 | | fn default() -> SegQueue<T> { |
484 | | SegQueue::new() |
485 | | } |
486 | | } |