Coverage Report

Created: 2021-01-22 16:54

crossbeam-queue/src/seg_queue.rs
Line
Count
Source (jump to first uncovered line)
1
use alloc::boxed::Box;
2
use core::cell::UnsafeCell;
3
use core::fmt;
4
use core::marker::PhantomData;
5
use core::mem::MaybeUninit;
6
use core::ptr;
7
use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
8
9
use crossbeam_utils::{Backoff, CachePadded};
10
11
// Bits indicating the state of a slot:
12
// * If a value has been written into the slot, `WRITE` is set.
13
// * If a value has been read from the slot, `READ` is set.
14
// * If the block is being destroyed, `DESTROY` is set.
15
const WRITE: usize = 1;
16
const READ: usize = 2;
17
const DESTROY: usize = 4;
18
19
// Each block covers one "lap" of indices.
20
const LAP: usize = 32;
21
// The maximum number of values a block can hold.
22
const BLOCK_CAP: usize = LAP - 1;
23
// How many lower bits are reserved for metadata.
24
const SHIFT: usize = 1;
25
// Indicates that the block is not the last one.
26
const HAS_NEXT: usize = 1;
27
28
/// A slot in a block.
29
struct Slot<T> {
30
    /// The value.
31
    value: UnsafeCell<MaybeUninit<T>>,
32
33
    /// The state of the slot.
34
    state: AtomicUsize,
35
}
36
37
impl<T> Slot<T> {
38
    /// Waits until a value is written into the slot.
39
668k
    fn wait_write(&self) {
40
668k
        let backoff = Backoff::new();
41
668k
        while self.state.load(Ordering::Acquire) & WRITE == 0 {
42
2
            backoff.snooze();
43
2
        }
44
668k
    }
<crossbeam_queue::seg_queue::Slot<usize>>::wait_write
Line
Count
Source
39
192k
    fn wait_write(&self) {
40
192k
        let backoff = Backoff::new();
41
192k
        while self.state.load(Ordering::Acquire) & WRITE == 0 {
42
0
            backoff.snooze();
43
0
        }
44
192k
    }
<crossbeam_queue::seg_queue::Slot<i32>>::wait_write
Line
Count
Source
39
2
    fn wait_write(&self) {
40
2
        let backoff = Backoff::new();
41
2
        while self.state.load(Ordering::Acquire) & WRITE == 0 {
42
0
            backoff.snooze();
43
0
        }
44
2
    }
<crossbeam_queue::seg_queue::Slot<()>>::wait_write
Line
Count
Source
39
1
    fn wait_write(&self) {
40
1
        let backoff = Backoff::new();
41
1
        while self.state.load(Ordering::Acquire) & WRITE == 0 {
42
0
            backoff.snooze();
43
0
        }
44
1
    }
<crossbeam_queue::seg_queue::Slot<seg_queue::drops::DropCounter>>::wait_write
Line
Count
Source
39
475k
    fn wait_write(&self) {
40
475k
        let backoff = Backoff::new();
41
475k
        while self.state.load(Ordering::Acquire) & WRITE == 0 {
42
2
            backoff.snooze();
43
2
        }
44
475k
    }
45
}
46
47
/// A block in a linked list.
48
///
49
/// Each block in the list can hold up to `BLOCK_CAP` values.
50
struct Block<T> {
51
    /// The next block in the linked list.
52
    next: AtomicPtr<Block<T>>,
53
54
    /// Slots for values.
55
    slots: [Slot<T>; BLOCK_CAP],
56
}
57
58
impl<T> Block<T> {
59
    /// Creates an empty block that starts at `start_index`.
60
23.5k
    fn new() -> Block<T> {
61
23.5k
        // SAFETY: This is safe because:
62
23.5k
        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
63
23.5k
        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
64
23.5k
        //  [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
65
23.5k
        //       holds a MaybeUninit.
66
23.5k
        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
67
23.5k
        unsafe { MaybeUninit::zeroed().assume_init() }
68
23.5k
    }
<crossbeam_queue::seg_queue::Block<i32>>::new
Line
Count
Source
60
1
    fn new() -> Block<T> {
61
1
        // SAFETY: This is safe because:
62
1
        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
63
1
        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
64
1
        //  [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
65
1
        //       holds a MaybeUninit.
66
1
        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
67
1
        unsafe { MaybeUninit::zeroed().assume_init() }
68
1
    }
<crossbeam_queue::seg_queue::Block<seg_queue::drops::DropCounter>>::new
Line
Count
Source
60
17.0k
    fn new() -> Block<T> {
61
17.0k
        // SAFETY: This is safe because:
62
17.0k
        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
63
17.0k
        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
64
17.0k
        //  [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
65
17.0k
        //       holds a MaybeUninit.
66
17.0k
        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
67
17.0k
        unsafe { MaybeUninit::zeroed().assume_init() }
68
17.0k
    }
<crossbeam_queue::seg_queue::Block<()>>::new
Line
Count
Source
60
1
    fn new() -> Block<T> {
61
1
        // SAFETY: This is safe because:
62
1
        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
63
1
        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
64
1
        //  [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
65
1
        //       holds a MaybeUninit.
66
1
        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
67
1
        unsafe { MaybeUninit::zeroed().assume_init() }
68
1
    }
<crossbeam_queue::seg_queue::Block<usize>>::new
Line
Count
Source
60
6.50k
    fn new() -> Block<T> {
61
6.50k
        // SAFETY: This is safe because:
62
6.50k
        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
63
6.50k
        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
64
6.50k
        //  [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
65
6.50k
        //       holds a MaybeUninit.
66
6.50k
        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
67
6.50k
        unsafe { MaybeUninit::zeroed().assume_init() }
68
6.50k
    }
69
70
    /// Waits until the next pointer is set.
71
21.7k
    fn wait_next(&self) -> *mut Block<T> {
72
21.7k
        let backoff = Backoff::new();
73
        loop {
74
21.7k
            let next = self.next.load(Ordering::Acquire);
75
21.7k
            if !next.is_null() {
76
21.7k
                return next;
77
2
            }
78
2
            backoff.snooze();
79
        }
80
21.7k
    }
<crossbeam_queue::seg_queue::Block<usize>>::wait_next
Line
Count
Source
71
6.44k
    fn wait_next(&self) -> *mut Block<T> {
72
6.44k
        let backoff = Backoff::new();
73
        loop {
74
6.44k
            let next = self.next.load(Ordering::Acquire);
75
6.44k
            if !next.is_null() {
76
6.44k
                return next;
77
0
            }
78
0
            backoff.snooze();
79
        }
80
6.44k
    }
<crossbeam_queue::seg_queue::Block<seg_queue::drops::DropCounter>>::wait_next
Line
Count
Source
71
15.2k
    fn wait_next(&self) -> *mut Block<T> {
72
15.2k
        let backoff = Backoff::new();
73
        loop {
74
15.2k
            let next = self.next.load(Ordering::Acquire);
75
15.2k
            if !next.is_null() {
76
15.2k
                return next;
77
2
            }
78
2
            backoff.snooze();
79
        }
80
15.2k
    }
Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<()>>::wait_next
Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<i32>>::wait_next
81
82
    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
83
21.7k
    unsafe fn destroy(this: *mut Block<T>, start: usize) {
84
        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
85
        // begun destruction of the block.
86
650k
        for i in 
start..BLOCK_CAP - 121.7k
{
87
650k
            let slot = (*this).slots.get_unchecked(i);
88
89
            // Mark the `DESTROY` bit if a thread is still using the slot.
90
650k
            if slot.state.load(Ordering::Acquire) & READ == 0
91
1
                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
92
            {
93
                // If a thread is still using the slot, it will continue destruction of the block.
94
1
                return;
95
650k
            }
96
        }
97
98
        // No thread is using the block, now it is safe to destroy it.
99
21.7k
        drop(Box::from_raw(this));
100
21.7k
    }
Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<()>>::destroy
<crossbeam_queue::seg_queue::Block<usize>>::destroy
Line
Count
Source
83
6.44k
    unsafe fn destroy(this: *mut Block<T>, start: usize) {
84
        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
85
        // begun destruction of the block.
86
191k
        for i in 
start..BLOCK_CAP - 16.44k
{
87
191k
            let slot = (*this).slots.get_unchecked(i);
88
89
            // Mark the `DESTROY` bit if a thread is still using the slot.
90
191k
            if slot.state.load(Ordering::Acquire) & READ == 0
91
1
                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
92
            {
93
                // If a thread is still using the slot, it will continue destruction of the block.
94
1
                return;
95
192k
            }
96
        }
97
98
        // No thread is using the block, now it is safe to destroy it.
99
6.44k
        drop(Box::from_raw(this));
100
6.45k
    }
Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<i32>>::destroy
<crossbeam_queue::seg_queue::Block<seg_queue::drops::DropCounter>>::destroy
Line
Count
Source
83
15.2k
    unsafe fn destroy(this: *mut Block<T>, start: usize) {
84
        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
85
        // begun destruction of the block.
86
458k
        for i in 
start..BLOCK_CAP - 115.2k
{
87
458k
            let slot = (*this).slots.get_unchecked(i);
88
89
            // Mark the `DESTROY` bit if a thread is still using the slot.
90
458k
            if slot.state.load(Ordering::Acquire) & READ == 0
91
0
                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
92
            {
93
                // If a thread is still using the slot, it will continue destruction of the block.
94
0
                return;
95
458k
            }
96
        }
97
98
        // No thread is using the block, now it is safe to destroy it.
99
15.2k
        drop(Box::from_raw(this));
100
15.2k
    }
101
}
102
103
/// A position in a queue.
104
struct Position<T> {
105
    /// The index in the queue.
106
    index: AtomicUsize,
107
108
    /// The block in the linked list.
109
    block: AtomicPtr<Block<T>>,
110
}
111
112
/// An unbounded multi-producer multi-consumer queue.
113
///
114
/// This queue is implemented as a linked list of segments, where each segment is a small buffer
115
/// that can hold a handful of elements. There is no limit to how many elements can be in the queue
116
/// at a time. However, since segments need to be dynamically allocated as elements get pushed,
117
/// this queue is somewhat slower than [`ArrayQueue`].
118
///
119
/// [`ArrayQueue`]: super::ArrayQueue
120
///
121
/// # Examples
122
///
123
/// ```
124
/// use crossbeam_queue::SegQueue;
125
///
126
/// let q = SegQueue::new();
127
///
128
/// q.push('a');
129
/// q.push('b');
130
///
131
/// assert_eq!(q.pop(), Some('a'));
132
/// assert_eq!(q.pop(), Some('b'));
133
/// assert!(q.pop().is_none());
134
/// ```
135
pub struct SegQueue<T> {
136
    /// The head of the queue.
137
    head: CachePadded<Position<T>>,
138
139
    /// The tail of the queue.
140
    tail: CachePadded<Position<T>>,
141
142
    /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
143
    _marker: PhantomData<T>,
144
}
145
146
unsafe impl<T: Send> Send for SegQueue<T> {}
147
unsafe impl<T: Send> Sync for SegQueue<T> {}
148
149
impl<T> SegQueue<T> {
150
    /// Creates a new unbounded queue.
151
    ///
152
    /// # Examples
153
    ///
154
    /// ```
155
    /// use crossbeam_queue::SegQueue;
156
    ///
157
    /// let q = SegQueue::<i32>::new();
158
    /// ```
159
105
    pub const fn new() -> SegQueue<T> {
160
105
        SegQueue {
161
105
            head: CachePadded::new(Position {
162
105
                block: AtomicPtr::new(ptr::null_mut()),
163
105
                index: AtomicUsize::new(0),
164
105
            }),
165
105
            tail: CachePadded::new(Position {
166
105
                block: AtomicPtr::new(ptr::null_mut()),
167
105
                index: AtomicUsize::new(0),
168
105
            }),
169
105
            _marker: PhantomData,
170
105
        }
171
105
    }
<crossbeam_queue::seg_queue::SegQueue<i32>>::new
Line
Count
Source
159
1
    pub const fn new() -> SegQueue<T> {
160
1
        SegQueue {
161
1
            head: CachePadded::new(Position {
162
1
                block: AtomicPtr::new(ptr::null_mut()),
163
1
                index: AtomicUsize::new(0),
164
1
            }),
165
1
            tail: CachePadded::new(Position {
166
1
                block: AtomicPtr::new(ptr::null_mut()),
167
1
                index: AtomicUsize::new(0),
168
1
            }),
169
1
            _marker: PhantomData,
170
1
        }
171
1
    }
<crossbeam_queue::seg_queue::SegQueue<seg_queue::drops::DropCounter>>::new
Line
Count
Source
159
100
    pub const fn new() -> SegQueue<T> {
160
100
        SegQueue {
161
100
            head: CachePadded::new(Position {
162
100
                block: AtomicPtr::new(ptr::null_mut()),
163
100
                index: AtomicUsize::new(0),
164
100
            }),
165
100
            tail: CachePadded::new(Position {
166
100
                block: AtomicPtr::new(ptr::null_mut()),
167
100
                index: AtomicUsize::new(0),
168
100
            }),
169
100
            _marker: PhantomData,
170
100
        }
171
100
    }
<crossbeam_queue::seg_queue::SegQueue<()>>::new
Line
Count
Source
159
1
    pub const fn new() -> SegQueue<T> {
160
1
        SegQueue {
161
1
            head: CachePadded::new(Position {
162
1
                block: AtomicPtr::new(ptr::null_mut()),
163
1
                index: AtomicUsize::new(0),
164
1
            }),
165
1
            tail: CachePadded::new(Position {
166
1
                block: AtomicPtr::new(ptr::null_mut()),
167
1
                index: AtomicUsize::new(0),
168
1
            }),
169
1
            _marker: PhantomData,
170
1
        }
171
1
    }
<crossbeam_queue::seg_queue::SegQueue<usize>>::new
Line
Count
Source
159
3
    pub const fn new() -> SegQueue<T> {
160
3
        SegQueue {
161
3
            head: CachePadded::new(Position {
162
3
                block: AtomicPtr::new(ptr::null_mut()),
163
3
                index: AtomicUsize::new(0),
164
3
            }),
165
3
            tail: CachePadded::new(Position {
166
3
                block: AtomicPtr::new(ptr::null_mut()),
167
3
                index: AtomicUsize::new(0),
168
3
            }),
169
3
            _marker: PhantomData,
170
3
        }
171
3
    }
172
173
    /// Pushes an element into the queue.
174
    ///
175
    /// # Examples
176
    ///
177
    /// ```
178
    /// use crossbeam_queue::SegQueue;
179
    ///
180
    /// let q = SegQueue::new();
181
    ///
182
    /// q.push(10);
183
    /// q.push(20);
184
    /// ```
185
718k
    pub fn push(&self, value: T) {
186
718k
        let backoff = Backoff::new();
187
718k
        let mut tail = self.tail.index.load(Ordering::Acquire);
188
718k
        let mut block = self.tail.block.load(Ordering::Acquire);
189
718k
        let mut next_block = None;
190
191
724k
        loop {
192
724k
            // Calculate the offset of the index into the block.
193
724k
            let offset = (tail >> SHIFT) % LAP;
194
724k
195
724k
            // If we reached the end of the block, wait until the next one is installed.
196
724k
            if offset == BLOCK_CAP {
197
118
                backoff.snooze();
198
118
                tail = self.tail.index.load(Ordering::Acquire);
199
118
                block = self.tail.block.load(Ordering::Acquire);
200
                continue;
201
724k
            }
202
203
            // If we're going to have to install the next block, allocate it in advance in order to
204
            // make the wait for other threads as short as possible.
205
724k
            if offset + 1 == BLOCK_CAP && 
next_block.is_none()23.8k
{
206
23.4k
                next_block = Some(Box::new(Block::<T>::new()));
207
700k
            }
208
209
            // If this is the first push operation, we need to allocate the first block.
210
723k
            if block.is_null() {
211
105
                let new = Box::into_raw(Box::new(Block::<T>::new()));
212
105
213
105
                if self
214
105
                    .tail
215
105
                    .block
216
105
                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
217
105
                    .is_ok()
218
105
                {
219
105
                    self.head.block.store(new, Ordering::Release);
220
105
                    block = new;
221
105
                } else {
222
0
                    next_block = unsafe { Some(Box::from_raw(new)) };
223
0
                    tail = self.tail.index.load(Ordering::Acquire);
224
0
                    block = self.tail.block.load(Ordering::Acquire);
225
                    continue;
226
                }
227
723k
            }
228
229
723k
            let new_tail = tail + (1 << SHIFT);
230
723k
231
723k
            // Try advancing the tail forward.
232
723k
            match self.tail.index.compare_exchange_weak(
233
723k
                tail,
234
723k
                new_tail,
235
723k
                Ordering::SeqCst,
236
723k
                Ordering::Acquire,
237
723k
            ) {
238
723k
                Ok(_) => unsafe {
239
                    // If we've reached the end of the block, install the next one.
240
716k
                    if offset + 1 == BLOCK_CAP {
241
23.3k
                        let next_block = Box::into_raw(next_block.unwrap());
242
23.3k
                        let next_index = new_tail.wrapping_add(1 << SHIFT);
243
23.3k
244
23.3k
                        self.tail.block.store(next_block, Ordering::Release);
245
23.3k
                        self.tail.index.store(next_index, Ordering::Release);
246
23.3k
                        (*block).next.store(next_block, Ordering::Release);
247
693k
                    }
248
249
                    // Write the value into the slot.
250
716k
                    let slot = (*block).slots.get_unchecked(offset);
251
716k
                    slot.value.get().write(MaybeUninit::new(value));
252
716k
                    slot.state.fetch_or(WRITE, Ordering::Release);
253
716k
254
716k
                    return;
255
                },
256
6.72k
                Err(t) => {
257
6.72k
                    tail = t;
258
6.72k
                    block = self.tail.block.load(Ordering::Acquire);
259
6.72k
                    backoff.spin();
260
6.72k
                }
261
            }
262
        }
263
716k
    }
<crossbeam_queue::seg_queue::SegQueue<usize>>::push
Line
Count
Source
185
191k
    pub fn push(&self, value: T) {
186
191k
        let backoff = Backoff::new();
187
191k
        let mut tail = self.tail.index.load(Ordering::Acquire);
188
191k
        let mut block = self.tail.block.load(Ordering::Acquire);
189
191k
        let mut next_block = None;
190
191
198k
        loop {
192
198k
            // Calculate the offset of the index into the block.
193
198k
            let offset = (tail >> SHIFT) % LAP;
194
198k
195
198k
            // If we reached the end of the block, wait until the next one is installed.
196
198k
            if offset == BLOCK_CAP {
197
118
                backoff.snooze();
198
118
                tail = self.tail.index.load(Ordering::Acquire);
199
118
                block = self.tail.block.load(Ordering::Acquire);
200
                continue;
201
198k
            }
202
203
            // If we're going to have to install the next block, allocate it in advance in order to
204
            // make the wait for other threads as short as possible.
205
198k
            if offset + 1 == BLOCK_CAP && 
next_block.is_none()6.89k
{
206
6.50k
                next_block = Some(Box::new(Block::<T>::new()));
207
190k
            }
208
209
            // If this is the first push operation, we need to allocate the first block.
210
197k
            if block.is_null() {
211
3
                let new = Box::into_raw(Box::new(Block::<T>::new()));
212
3
213
3
                if self
214
3
                    .tail
215
3
                    .block
216
3
                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
217
3
                    .is_ok()
218
3
                {
219
3
                    self.head.block.store(new, Ordering::Release);
220
3
                    block = new;
221
3
                } else {
222
0
                    next_block = unsafe { Some(Box::from_raw(new)) };
223
0
                    tail = self.tail.index.load(Ordering::Acquire);
224
0
                    block = self.tail.block.load(Ordering::Acquire);
225
                    continue;
226
                }
227
197k
            }
228
229
197k
            let new_tail = tail + (1 << SHIFT);
230
197k
231
197k
            // Try advancing the tail forward.
232
197k
            match self.tail.index.compare_exchange_weak(
233
197k
                tail,
234
197k
                new_tail,
235
197k
                Ordering::SeqCst,
236
197k
                Ordering::Acquire,
237
197k
            ) {
238
197k
                Ok(_) => unsafe {
239
                    // If we've reached the end of the block, install the next one.
240
190k
                    if offset + 1 == BLOCK_CAP {
241
6.44k
                        let next_block = Box::into_raw(next_block.unwrap());
242
6.44k
                        let next_index = new_tail.wrapping_add(1 << SHIFT);
243
6.44k
244
6.44k
                        self.tail.block.store(next_block, Ordering::Release);
245
6.44k
                        self.tail.index.store(next_index, Ordering::Release);
246
6.44k
                        (*block).next.store(next_block, Ordering::Release);
247
183k
                    }
248
249
                    // Write the value into the slot.
250
190k
                    let slot = (*block).slots.get_unchecked(offset);
251
190k
                    slot.value.get().write(MaybeUninit::new(value));
252
190k
                    slot.state.fetch_or(WRITE, Ordering::Release);
253
190k
254
190k
                    return;
255
                },
256
6.72k
                Err(t) => {
257
6.72k
                    tail = t;
258
6.72k
                    block = self.tail.block.load(Ordering::Acquire);
259
6.72k
                    backoff.spin();
260
6.72k
                }
261
            }
262
        }
263
190k
    }
<crossbeam_queue::seg_queue::SegQueue<i32>>::push
Line
Count
Source
185
2
    pub fn push(&self, value: T) {
186
2
        let backoff = Backoff::new();
187
2
        let mut tail = self.tail.index.load(Ordering::Acquire);
188
2
        let mut block = self.tail.block.load(Ordering::Acquire);
189
2
        let mut next_block = None;
190
191
2
        loop {
192
2
            // Calculate the offset of the index into the block.
193
2
            let offset = (tail >> SHIFT) % LAP;
194
2
195
2
            // If we reached the end of the block, wait until the next one is installed.
196
2
            if offset == BLOCK_CAP {
197
0
                backoff.snooze();
198
0
                tail = self.tail.index.load(Ordering::Acquire);
199
0
                block = self.tail.block.load(Ordering::Acquire);
200
                continue;
201
2
            }
202
203
            // If we're going to have to install the next block, allocate it in advance in order to
204
            // make the wait for other threads as short as possible.
205
2
            if offset + 1 == BLOCK_CAP && 
next_block.is_none()0
{
206
0
                next_block = Some(Box::new(Block::<T>::new()));
207
2
            }
208
209
            // If this is the first push operation, we need to allocate the first block.
210
2
            if block.is_null() {
211
1
                let new = Box::into_raw(Box::new(Block::<T>::new()));
212
1
213
1
                if self
214
1
                    .tail
215
1
                    .block
216
1
                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
217
1
                    .is_ok()
218
1
                {
219
1
                    self.head.block.store(new, Ordering::Release);
220
1
                    block = new;
221
1
                } else {
222
0
                    next_block = unsafe { Some(Box::from_raw(new)) };
223
0
                    tail = self.tail.index.load(Ordering::Acquire);
224
0
                    block = self.tail.block.load(Ordering::Acquire);
225
                    continue;
226
                }
227
1
            }
228
229
2
            let new_tail = tail + (1 << SHIFT);
230
2
231
2
            // Try advancing the tail forward.
232
2
            match self.tail.index.compare_exchange_weak(
233
2
                tail,
234
2
                new_tail,
235
2
                Ordering::SeqCst,
236
2
                Ordering::Acquire,
237
2
            ) {
238
2
                Ok(_) => unsafe {
239
                    // If we've reached the end of the block, install the next one.
240
2
                    if offset + 1 == BLOCK_CAP {
241
0
                        let next_block = Box::into_raw(next_block.unwrap());
242
0
                        let next_index = new_tail.wrapping_add(1 << SHIFT);
243
0
244
0
                        self.tail.block.store(next_block, Ordering::Release);
245
0
                        self.tail.index.store(next_index, Ordering::Release);
246
0
                        (*block).next.store(next_block, Ordering::Release);
247
2
                    }
248
249
                    // Write the value into the slot.
250
2
                    let slot = (*block).slots.get_unchecked(offset);
251
2
                    slot.value.get().write(MaybeUninit::new(value));
252
2
                    slot.state.fetch_or(WRITE, Ordering::Release);
253
2
254
2
                    return;
255
                },
256
0
                Err(t) => {
257
0
                    tail = t;
258
0
                    block = self.tail.block.load(Ordering::Acquire);
259
0
                    backoff.spin();
260
0
                }
261
            }
262
        }
263
2
    }
<crossbeam_queue::seg_queue::SegQueue<()>>::push
Line
Count
Source
185
1
    pub fn push(&self, value: T) {
186
1
        let backoff = Backoff::new();
187
1
        let mut tail = self.tail.index.load(Ordering::Acquire);
188
1
        let mut block = self.tail.block.load(Ordering::Acquire);
189
1
        let mut next_block = None;
190
191
1
        loop {
192
1
            // Calculate the offset of the index into the block.
193
1
            let offset = (tail >> SHIFT) % LAP;
194
1
195
1
            // If we reached the end of the block, wait until the next one is installed.
196
1
            if offset == BLOCK_CAP {
197
0
                backoff.snooze();
198
0
                tail = self.tail.index.load(Ordering::Acquire);
199
0
                block = self.tail.block.load(Ordering::Acquire);
200
                continue;
201
1
            }
202
203
            // If we're going to have to install the next block, allocate it in advance in order to
204
            // make the wait for other threads as short as possible.
205
1
            if offset + 1 == BLOCK_CAP && 
next_block.is_none()0
{
206
0
                next_block = Some(Box::new(Block::<T>::new()));
207
1
            }
208
209
            // If this is the first push operation, we need to allocate the first block.
210
1
            if block.is_null() {
211
1
                let new = Box::into_raw(Box::new(Block::<T>::new()));
212
1
213
1
                if self
214
1
                    .tail
215
1
                    .block
216
1
                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
217
1
                    .is_ok()
218
1
                {
219
1
                    self.head.block.store(new, Ordering::Release);
220
1
                    block = new;
221
1
                } else {
222
0
                    next_block = unsafe { Some(Box::from_raw(new)) };
223
0
                    tail = self.tail.index.load(Ordering::Acquire);
224
0
                    block = self.tail.block.load(Ordering::Acquire);
225
                    continue;
226
                }
227
0
            }
228
229
1
            let new_tail = tail + (1 << SHIFT);
230
1
231
1
            // Try advancing the tail forward.
232
1
            match self.tail.index.compare_exchange_weak(
233
1
                tail,
234
1
                new_tail,
235
1
                Ordering::SeqCst,
236
1
                Ordering::Acquire,
237
1
            ) {
238
1
                Ok(_) => unsafe {
239
                    // If we've reached the end of the block, install the next one.
240
1
                    if offset + 1 == BLOCK_CAP {
241
0
                        let next_block = Box::into_raw(next_block.unwrap());
242
0
                        let next_index = new_tail.wrapping_add(1 << SHIFT);
243
0
244
0
                        self.tail.block.store(next_block, Ordering::Release);
245
0
                        self.tail.index.store(next_index, Ordering::Release);
246
0
                        (*block).next.store(next_block, Ordering::Release);
247
1
                    }
248
249
                    // Write the value into the slot.
250
1
                    let slot = (*block).slots.get_unchecked(offset);
251
1
                    slot.value.get().write(MaybeUninit::new(value));
252
1
                    slot.state.fetch_or(WRITE, Ordering::Release);
253
1
254
1
                    return;
255
                },
256
0
                Err(t) => {
257
0
                    tail = t;
258
0
                    block = self.tail.block.load(Ordering::Acquire);
259
0
                    backoff.spin();
260
0
                }
261
            }
262
        }
263
1
    }
<crossbeam_queue::seg_queue::SegQueue<seg_queue::drops::DropCounter>>::push
Line
Count
Source
185
526k
    pub fn push(&self, value: T) {
186
526k
        let backoff = Backoff::new();
187
526k
        let mut tail = self.tail.index.load(Ordering::Acquire);
188
526k
        let mut block = self.tail.block.load(Ordering::Acquire);
189
526k
        let mut next_block = None;
190
191
526k
        loop {
192
526k
            // Calculate the offset of the index into the block.
193
526k
            let offset = (tail >> SHIFT) % LAP;
194
526k
195
526k
            // If we reached the end of the block, wait until the next one is installed.
196
526k
            if offset == BLOCK_CAP {
197
0
                backoff.snooze();
198
0
                tail = self.tail.index.load(Ordering::Acquire);
199
0
                block = self.tail.block.load(Ordering::Acquire);
200
                continue;
201
526k
            }
202
203
            // If we're going to have to install the next block, allocate it in advance in order to
204
            // make the wait for other threads as short as possible.
205
526k
            if offset + 1 == BLOCK_CAP && 
next_block.is_none()16.9k
{
206
16.9k
                next_block = Some(Box::new(Block::<T>::new()));
207
509k
            }
208
209
            // If this is the first push operation, we need to allocate the first block.
210
526k
            if block.is_null() {
211
100
                let new = Box::into_raw(Box::new(Block::<T>::new()));
212
100
213
100
                if self
214
100
                    .tail
215
100
                    .block
216
100
                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
217
100
                    .is_ok()
218
100
                {
219
100
                    self.head.block.store(new, Ordering::Release);
220
100
                    block = new;
221
100
                } else {
222
0
                    next_block = unsafe { Some(Box::from_raw(new)) };
223
0
                    tail = self.tail.index.load(Ordering::Acquire);
224
0
                    block = self.tail.block.load(Ordering::Acquire);
225
                    continue;
226
                }
227
526k
            }
228
229
526k
            let new_tail = tail + (1 << SHIFT);
230
526k
231
526k
            // Try advancing the tail forward.
232
526k
            match self.tail.index.compare_exchange_weak(
233
526k
                tail,
234
526k
                new_tail,
235
526k
                Ordering::SeqCst,
236
526k
                Ordering::Acquire,
237
526k
            ) {
238
526k
                Ok(_) => unsafe {
239
                    // If we've reached the end of the block, install the next one.
240
526k
                    if offset + 1 == BLOCK_CAP {
241
16.9k
                        let next_block = Box::into_raw(next_block.unwrap());
242
16.9k
                        let next_index = new_tail.wrapping_add(1 << SHIFT);
243
16.9k
244
16.9k
                        self.tail.block.store(next_block, Ordering::Release);
245
16.9k
                        self.tail.index.store(next_index, Ordering::Release);
246
16.9k
                        (*block).next.store(next_block, Ordering::Release);
247
509k
                    }
248
249
                    // Write the value into the slot.
250
526k
                    let slot = (*block).slots.get_unchecked(offset);
251
526k
                    slot.value.get().write(MaybeUninit::new(value));
252
526k
                    slot.state.fetch_or(WRITE, Ordering::Release);
253
526k
254
526k
                    return;
255
                },
256
0
                Err(t) => {
257
0
                    tail = t;
258
0
                    block = self.tail.block.load(Ordering::Acquire);
259
0
                    backoff.spin();
260
0
                }
261
            }
262
        }
263
526k
    }
264
265
    /// Pops an element from the queue.
266
    ///
267
    /// If the queue is empty, `None` is returned.
268
    ///
269
    /// # Examples
270
    ///
271
    /// ```
272
    /// use crossbeam_queue::SegQueue;
273
    ///
274
    /// let q = SegQueue::new();
275
    ///
276
    /// q.push(10);
277
    /// assert_eq!(q.pop(), Some(10));
278
    /// assert!(q.pop().is_none());
279
    /// ```
280
731k
    pub fn pop(&self) -> Option<T> {
281
731k
        let backoff = Backoff::new();
282
731k
        let mut head = self.head.index.load(Ordering::Acquire);
283
731k
        let mut block = self.head.block.load(Ordering::Acquire);
284
285
742k
        loop {
286
742k
            // Calculate the offset of the index into the block.
287
742k
            let offset = (head >> SHIFT) % LAP;
288
742k
289
742k
            // If we reached the end of the block, wait until the next one is installed.
290
742k
            if offset == BLOCK_CAP {
291
169
                backoff.snooze();
292
169
                head = self.head.index.load(Ordering::Acquire);
293
169
                block = self.head.block.load(Ordering::Acquire);
294
                continue;
295
741k
            }
296
741k
297
741k
            let mut new_head = head + (1 << SHIFT);
298
741k
299
741k
            if new_head & HAS_NEXT == 0 {
300
68.8k
                atomic::fence(Ordering::SeqCst);
301
68.8k
                let tail = self.tail.index.load(Ordering::Relaxed);
302
68.8k
303
68.8k
                // If the tail equals the head, that means the queue is empty.
304
68.8k
                if head >> SHIFT == tail >> SHIFT {
305
64.3k
                    return None;
306
4.50k
                }
307
4.50k
308
4.50k
                // If head and tail are not in the same block, set `HAS_NEXT` in head.
309
4.50k
                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
310
211
                    new_head |= HAS_NEXT;
311
3.62k
                }
312
673k
            }
313
314
            // The block can be null here only if the first push operation is in progress. In that
315
            // case, just wait until it gets initialized.
316
676k
            if block.is_null() {
317
21
                backoff.snooze();
318
21
                head = self.head.index.load(Ordering::Acquire);
319
21
                block = self.head.block.load(Ordering::Acquire);
320
                continue;
321
676k
            }
322
676k
323
676k
            // Try moving the head index forward.
324
676k
            match self.head.index.compare_exchange_weak(
325
676k
                head,
326
676k
                new_head,
327
676k
                Ordering::SeqCst,
328
676k
                Ordering::Acquire,
329
676k
            ) {
330
676k
                Ok(_) => unsafe {
331
                    // If we've reached the end of the block, move to the next one.
332
666k
                    if offset + 1 == BLOCK_CAP {
333
21.7k
                        let next = (*block).wait_next();
334
21.7k
                        let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
335
21.7k
                        if !(*next).next.load(Ordering::Relaxed).is_null() {
336
21.5k
                            next_index |= HAS_NEXT;
337
21.5k
                        }
207
338
339
21.7k
                        self.head.block.store(next, Ordering::Release);
340
21.7k
                        self.head.index.store(next_index, Ordering::Release);
341
644k
                    }
342
343
                    // Read the value.
344
666k
                    let slot = (*block).slots.get_unchecked(offset);
345
666k
                    slot.wait_write();
346
666k
                    let value = slot.value.get().read().assume_init();
347
666k
348
666k
                    // Destroy the block if we've reached the end, or if another thread wanted to
349
666k
                    // destroy but couldn't because we were busy reading from the slot.
350
666k
                    if offset + 1 == BLOCK_CAP {
351
21.7k
                        Block::destroy(block, 0);
352
644k
                    } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
353
1
                        Block::destroy(block, offset + 1);
354
651k
                    }
355
356
673k
                    return Some(value);
357
                },
358
10.5k
                Err(h) => {
359
10.5k
                    head = h;
360
10.5k
                    block = self.head.block.load(Ordering::Acquire);
361
10.5k
                    backoff.spin();
362
10.5k
                }
363
            }
364
        }
365
737k
    }
<crossbeam_queue::seg_queue::SegQueue<usize>>::pop
Line
Count
Source
280
209k
    pub fn pop(&self) -> Option<T> {
281
209k
        let backoff = Backoff::new();
282
209k
        let mut head = self.head.index.load(Ordering::Acquire);
283
209k
        let mut block = self.head.block.load(Ordering::Acquire);
284
285
220k
        loop {
286
220k
            // Calculate the offset of the index into the block.
287
220k
            let offset = (head >> SHIFT) % LAP;
288
220k
289
220k
            // If we reached the end of the block, wait until the next one is installed.
290
220k
            if offset == BLOCK_CAP {
291
169
                backoff.snooze();
292
169
                head = self.head.index.load(Ordering::Acquire);
293
169
                block = self.head.block.load(Ordering::Acquire);
294
                continue;
295
220k
            }
296
220k
297
220k
            let mut new_head = head + (1 << SHIFT);
298
220k
299
220k
            if new_head & HAS_NEXT == 0 {
300
18.9k
                atomic::fence(Ordering::SeqCst);
301
18.9k
                let tail = self.tail.index.load(Ordering::Relaxed);
302
18.9k
303
18.9k
                // If the tail equals the head, that means the queue is empty.
304
18.9k
                if head >> SHIFT == tail >> SHIFT {
305
18.1k
                    return None;
306
790
                }
307
790
308
790
                // If head and tail are not in the same block, set `HAS_NEXT` in head.
309
790
                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
310
10
                    new_head |= HAS_NEXT;
311
117
                }
312
201k
            }
313
314
            // The block can be null here only if the first push operation is in progress. In that
315
            // case, just wait until it gets initialized.
316
201k
            if block.is_null() {
317
0
                backoff.snooze();
318
0
                head = self.head.index.load(Ordering::Acquire);
319
0
                block = self.head.block.load(Ordering::Acquire);
320
                continue;
321
201k
            }
322
201k
323
201k
            // Try moving the head index forward.
324
201k
            match self.head.index.compare_exchange_weak(
325
201k
                head,
326
201k
                new_head,
327
201k
                Ordering::SeqCst,
328
201k
                Ordering::Acquire,
329
201k
            ) {
330
201k
                Ok(_) => unsafe {
331
                    // If we've reached the end of the block, move to the next one.
332
190k
                    if offset + 1 == BLOCK_CAP {
333
6.44k
                        let next = (*block).wait_next();
334
6.44k
                        let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
335
6.44k
                        if !(*next).next.load(Ordering::Relaxed).is_null() {
336
6.44k
                            next_index |= HAS_NEXT;
337
6.44k
                        }
5
338
339
6.44k
                        self.head.block.store(next, Ordering::Release);
340
6.44k
                        self.head.index.store(next_index, Ordering::Release);
341
184k
                    }
342
343
                    // Read the value.
344
190k
                    let slot = (*block).slots.get_unchecked(offset);
345
190k
                    slot.wait_write();
346
190k
                    let value = slot.value.get().read().assume_init();
347
190k
348
190k
                    // Destroy the block if we've reached the end, or if another thread wanted to
349
190k
                    // destroy but couldn't because we were busy reading from the slot.
350
190k
                    if offset + 1 == BLOCK_CAP {
351
6.44k
                        Block::destroy(block, 0);
352
184k
                    } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
353
1
                        Block::destroy(block, offset + 1);
354
191k
                    }
355
356
197k
                    return Some(value);
357
                },
358
10.5k
                Err(h) => {
359
10.5k
                    head = h;
360
10.5k
                    block = self.head.block.load(Ordering::Acquire);
361
10.5k
                    backoff.spin();
362
10.5k
                }
363
            }
364
        }
365
216k
    }
<crossbeam_queue::seg_queue::SegQueue<i32>>::pop
Line
Count
Source
280
3
    pub fn pop(&self) -> Option<T> {
281
3
        let backoff = Backoff::new();
282
3
        let mut head = self.head.index.load(Ordering::Acquire);
283
3
        let mut block = self.head.block.load(Ordering::Acquire);
284
285
3
        loop {
286
3
            // Calculate the offset of the index into the block.
287
3
            let offset = (head >> SHIFT) % LAP;
288
3
289
3
            // If we reached the end of the block, wait until the next one is installed.
290
3
            if offset == BLOCK_CAP {
291
0
                backoff.snooze();
292
0
                head = self.head.index.load(Ordering::Acquire);
293
0
                block = self.head.block.load(Ordering::Acquire);
294
                continue;
295
3
            }
296
3
297
3
            let mut new_head = head + (1 << SHIFT);
298
3
299
3
            if new_head & HAS_NEXT == 0 {
300
3
                atomic::fence(Ordering::SeqCst);
301
3
                let tail = self.tail.index.load(Ordering::Relaxed);
302
3
303
3
                // If the tail equals the head, that means the queue is empty.
304
3
                if head >> SHIFT == tail >> SHIFT {
305
1
                    return None;
306
2
                }
307
2
308
2
                // If head and tail are not in the same block, set `HAS_NEXT` in head.
309
2
                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
310
0
                    new_head |= HAS_NEXT;
311
2
                }
312
0
            }
313
314
            // The block can be null here only if the first push operation is in progress. In that
315
            // case, just wait until it gets initialized.
316
2
            if block.is_null() {
317
0
                backoff.snooze();
318
0
                head = self.head.index.load(Ordering::Acquire);
319
0
                block = self.head.block.load(Ordering::Acquire);
320
                continue;
321
2
            }
322
2
323
2
            // Try moving the head index forward.
324
2
            match self.head.index.compare_exchange_weak(
325
2
                head,
326
2
                new_head,
327
2
                Ordering::SeqCst,
328
2
                Ordering::Acquire,
329
2
            ) {
330
2
                Ok(_) => unsafe {
331
                    // If we've reached the end of the block, move to the next one.
332
2
                    if offset + 1 == BLOCK_CAP {
333
0
                        let next = (*block).wait_next();
334
0
                        let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
335
0
                        if !(*next).next.load(Ordering::Relaxed).is_null() {
336
0
                            next_index |= HAS_NEXT;
337
0
                        }
338
339
0
                        self.head.block.store(next, Ordering::Release);
340
0
                        self.head.index.store(next_index, Ordering::Release);
341
2
                    }
342
343
                    // Read the value.
344
2
                    let slot = (*block).slots.get_unchecked(offset);
345
2
                    slot.wait_write();
346
2
                    let value = slot.value.get().read().assume_init();
347
2
348
2
                    // Destroy the block if we've reached the end, or if another thread wanted to
349
2
                    // destroy but couldn't because we were busy reading from the slot.
350
2
                    if offset + 1 == BLOCK_CAP {
351
0
                        Block::destroy(block, 0);
352
2
                    } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
353
0
                        Block::destroy(block, offset + 1);
354
2
                    }
355
356
2
                    return Some(value);
357
                },
358
0
                Err(h) => {
359
0
                    head = h;
360
0
                    block = self.head.block.load(Ordering::Acquire);
361
0
                    backoff.spin();
362
0
                }
363
            }
364
        }
365
3
    }
<crossbeam_queue::seg_queue::SegQueue<seg_queue::drops::DropCounter>>::pop
Line
Count
Source
280
521k
    pub fn pop(&self) -> Option<T> {
281
521k
        let backoff = Backoff::new();
282
521k
        let mut head = self.head.index.load(Ordering::Acquire);
283
521k
        let mut block = self.head.block.load(Ordering::Acquire);
284
285
521k
        loop {
286
521k
            // Calculate the offset of the index into the block.
287
521k
            let offset = (head >> SHIFT) % LAP;
288
521k
289
521k
            // If we reached the end of the block, wait until the next one is installed.
290
521k
            if offset == BLOCK_CAP {
291
0
                backoff.snooze();
292
0
                head = self.head.index.load(Ordering::Acquire);
293
0
                block = self.head.block.load(Ordering::Acquire);
294
                continue;
295
521k
            }
296
521k
297
521k
            let mut new_head = head + (1 << SHIFT);
298
521k
299
521k
            if new_head & HAS_NEXT == 0 {
300
49.9k
                atomic::fence(Ordering::SeqCst);
301
49.9k
                let tail = self.tail.index.load(Ordering::Relaxed);
302
49.9k
303
49.9k
                // If the tail equals the head, that means the queue is empty.
304
49.9k
                if head >> SHIFT == tail >> SHIFT {
305
46.2k
                    return None;
306
3.70k
                }
307
3.70k
308
3.70k
                // If head and tail are not in the same block, set `HAS_NEXT` in head.
309
3.70k
                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
310
201
                    new_head |= HAS_NEXT;
311
3.50k
                }
312
471k
            }
313
314
            // The block can be null here only if the first push operation is in progress. In that
315
            // case, just wait until it gets initialized.
316
475k
            if block.is_null() {
317
21
                backoff.snooze();
318
21
                head = self.head.index.load(Ordering::Acquire);
319
21
                block = self.head.block.load(Ordering::Acquire);
320
                continue;
321
475k
            }
322
475k
323
475k
            // Try moving the head index forward.
324
475k
            match self.head.index.compare_exchange_weak(
325
475k
                head,
326
475k
                new_head,
327
475k
                Ordering::SeqCst,
328
475k
                Ordering::Acquire,
329
475k
            ) {
330
475k
                Ok(_) => unsafe {
331
                    // If we've reached the end of the block, move to the next one.
332
475k
                    if offset + 1 == BLOCK_CAP {
333
15.2k
                        let next = (*block).wait_next();
334
15.2k
                        let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
335
15.2k
                        if !(*next).next.load(Ordering::Relaxed).is_null() {
336
15.0k
                            next_index |= HAS_NEXT;
337
15.0k
                        }
202
338
339
15.2k
                        self.head.block.store(next, Ordering::Release);
340
15.2k
                        self.head.index.store(next_index, Ordering::Release);
341
460k
                    }
342
343
                    // Read the value.
344
475k
                    let slot = (*block).slots.get_unchecked(offset);
345
475k
                    slot.wait_write();
346
475k
                    let value = slot.value.get().read().assume_init();
347
475k
348
475k
                    // Destroy the block if we've reached the end, or if another thread wanted to
349
475k
                    // destroy but couldn't because we were busy reading from the slot.
350
475k
                    if offset + 1 == BLOCK_CAP {
351
15.2k
                        Block::destroy(block, 0);
352
460k
                    } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
353
0
                        Block::destroy(block, offset + 1);
354
460k
                    }
355
356
475k
                    return Some(value);
357
                },
358
0
                Err(h) => {
359
0
                    head = h;
360
0
                    block = self.head.block.load(Ordering::Acquire);
361
0
                    backoff.spin();
362
0
                }
363
            }
364
        }
365
521k
    }
<crossbeam_queue::seg_queue::SegQueue<()>>::pop
Line
Count
Source
280
1
    pub fn pop(&self) -> Option<T> {
281
1
        let backoff = Backoff::new();
282
1
        let mut head = self.head.index.load(Ordering::Acquire);
283
1
        let mut block = self.head.block.load(Ordering::Acquire);
284
285
1
        loop {
286
1
            // Calculate the offset of the index into the block.
287
1
            let offset = (head >> SHIFT) % LAP;
288
1
289
1
            // If we reached the end of the block, wait until the next one is installed.
290
1
            if offset == BLOCK_CAP {
291
0
                backoff.snooze();
292
0
                head = self.head.index.load(Ordering::Acquire);
293
0
                block = self.head.block.load(Ordering::Acquire);
294
                continue;
295
1
            }
296
1
297
1
            let mut new_head = head + (1 << SHIFT);
298
1
299
1
            if new_head & HAS_NEXT == 0 {
300
1
                atomic::fence(Ordering::SeqCst);
301
1
                let tail = self.tail.index.load(Ordering::Relaxed);
302
1
303
1
                // If the tail equals the head, that means the queue is empty.
304
1
                if head >> SHIFT == tail >> SHIFT {
305
0
                    return None;
306
1
                }
307
1
308
1
                // If head and tail are not in the same block, set `HAS_NEXT` in head.
309
1
                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
310
0
                    new_head |= HAS_NEXT;
311
1
                }
312
0
            }
313
314
            // The block can be null here only if the first push operation is in progress. In that
315
            // case, just wait until it gets initialized.
316
1
            if block.is_null() {
317
0
                backoff.snooze();
318
0
                head = self.head.index.load(Ordering::Acquire);
319
0
                block = self.head.block.load(Ordering::Acquire);
320
                continue;
321
1
            }
322
1
323
1
            // Try moving the head index forward.
324
1
            match self.head.index.compare_exchange_weak(
325
1
                head,
326
1
                new_head,
327
1
                Ordering::SeqCst,
328
1
                Ordering::Acquire,
329
1
            ) {
330
1
                Ok(_) => unsafe {
331
                    // If we've reached the end of the block, move to the next one.
332
1
                    if offset + 1 == BLOCK_CAP {
333
0
                        let next = (*block).wait_next();
334
0
                        let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
335
0
                        if !(*next).next.load(Ordering::Relaxed).is_null() {
336
0
                            next_index |= HAS_NEXT;
337
0
                        }
338
339
0
                        self.head.block.store(next, Ordering::Release);
340
0
                        self.head.index.store(next_index, Ordering::Release);
341
1
                    }
342
343
                    // Read the value.
344
1
                    let slot = (*block).slots.get_unchecked(offset);
345
1
                    slot.wait_write();
346
1
                    let value = slot.value.get().read().assume_init();
347
1
348
1
                    // Destroy the block if we've reached the end, or if another thread wanted to
349
1
                    // destroy but couldn't because we were busy reading from the slot.
350
1
                    if offset + 1 == BLOCK_CAP {
351
0
                        Block::destroy(block, 0);
352
1
                    } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
353
0
                        Block::destroy(block, offset + 1);
354
1
                    }
355
356
1
                    return Some(value);
357
                },
358
0
                Err(h) => {
359
0
                    head = h;
360
0
                    block = self.head.block.load(Ordering::Acquire);
361
0
                    backoff.spin();
362
0
                }
363
            }
364
        }
365
1
    }
366
367
    /// Returns `true` if the queue is empty.
368
    ///
369
    /// # Examples
370
    ///
371
    /// ```
372
    /// use crossbeam_queue::SegQueue;
373
    ///
374
    /// let q = SegQueue::new();
375
    ///
376
    /// assert!(q.is_empty());
377
    /// q.push(1);
378
    /// assert!(!q.is_empty());
379
    /// ```
380
3
    pub fn is_empty(&self) -> bool {
381
3
        let head = self.head.index.load(Ordering::SeqCst);
382
3
        let tail = self.tail.index.load(Ordering::SeqCst);
383
3
        head >> SHIFT == tail >> SHIFT
384
3
    }
385
386
    /// Returns the number of elements in the queue.
387
    ///
388
    /// # Examples
389
    ///
390
    /// ```
391
    /// use crossbeam_queue::SegQueue;
392
    ///
393
    /// let q = SegQueue::new();
394
    /// assert_eq!(q.len(), 0);
395
    ///
396
    /// q.push(10);
397
    /// assert_eq!(q.len(), 1);
398
    ///
399
    /// q.push(20);
400
    /// assert_eq!(q.len(), 2);
401
    /// ```
402
105
    pub fn len(&self) -> usize {
403
105
        loop {
404
105
            // Load the tail index, then load the head index.
405
105
            let mut tail = self.tail.index.load(Ordering::SeqCst);
406
105
            let mut head = self.head.index.load(Ordering::SeqCst);
407
105
408
105
            // If the tail index didn't change, we've got consistent indices to work with.
409
105
            if self.tail.index.load(Ordering::SeqCst) == tail {
410
                // Erase the lower bits.
411
105
                tail &= !((1 << SHIFT) - 1);
412
105
                head &= !((1 << SHIFT) - 1);
413
105
414
105
                // Fix up indices if they fall onto block ends.
415
105
                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
416
0
                    tail = tail.wrapping_add(1 << SHIFT);
417
105
                }
418
105
                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
419
0
                    head = head.wrapping_add(1 << SHIFT);
420
105
                }
421
422
                // Rotate indices so that head falls into the first block.
423
105
                let lap = (head >> SHIFT) / LAP;
424
105
                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
425
105
                head = head.wrapping_sub((lap * LAP) << SHIFT);
426
105
427
105
                // Remove the lower bits.
428
105
                tail >>= SHIFT;
429
105
                head >>= SHIFT;
430
105
431
105
                // Return the difference minus the number of blocks between tail and head.
432
105
                return tail - head - tail / LAP;
433
0
            }
434
        }
435
105
    }
<crossbeam_queue::seg_queue::SegQueue<usize>>::len
Line
Count
Source
402
102
    pub fn len(&self) -> usize {
403
102
        loop {
404
102
            // Load the tail index, then load the head index.
405
102
            let mut tail = self.tail.index.load(Ordering::SeqCst);
406
102
            let mut head = self.head.index.load(Ordering::SeqCst);
407
102
408
102
            // If the tail index didn't change, we've got consistent indices to work with.
409
102
            if self.tail.index.load(Ordering::SeqCst) == tail {
410
                // Erase the lower bits.
411
102
                tail &= !((1 << SHIFT) - 1);
412
102
                head &= !((1 << SHIFT) - 1);
413
102
414
102
                // Fix up indices if they fall onto block ends.
415
102
                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
416
0
                    tail = tail.wrapping_add(1 << SHIFT);
417
102
                }
418
102
                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
419
0
                    head = head.wrapping_add(1 << SHIFT);
420
102
                }
421
422
                // Rotate indices so that head falls into the first block.
423
102
                let lap = (head >> SHIFT) / LAP;
424
102
                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
425
102
                head = head.wrapping_sub((lap * LAP) << SHIFT);
426
102
427
102
                // Remove the lower bits.
428
102
                tail >>= SHIFT;
429
102
                head >>= SHIFT;
430
102
431
102
                // Return the difference minus the number of blocks between tail and head.
432
102
                return tail - head - tail / LAP;
433
0
            }
434
        }
435
102
    }
<crossbeam_queue::seg_queue::SegQueue<()>>::len
Line
Count
Source
402
3
    pub fn len(&self) -> usize {
403
3
        loop {
404
3
            // Load the tail index, then load the head index.
405
3
            let mut tail = self.tail.index.load(Ordering::SeqCst);
406
3
            let mut head = self.head.index.load(Ordering::SeqCst);
407
3
408
3
            // If the tail index didn't change, we've got consistent indices to work with.
409
3
            if self.tail.index.load(Ordering::SeqCst) == tail {
410
                // Erase the lower bits.
411
3
                tail &= !((1 << SHIFT) - 1);
412
3
                head &= !((1 << SHIFT) - 1);
413
3
414
3
                // Fix up indices if they fall onto block ends.
415
3
                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
416
0
                    tail = tail.wrapping_add(1 << SHIFT);
417
3
                }
418
3
                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
419
0
                    head = head.wrapping_add(1 << SHIFT);
420
3
                }
421
422
                // Rotate indices so that head falls into the first block.
423
3
                let lap = (head >> SHIFT) / LAP;
424
3
                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
425
3
                head = head.wrapping_sub((lap * LAP) << SHIFT);
426
3
427
3
                // Remove the lower bits.
428
3
                tail >>= SHIFT;
429
3
                head >>= SHIFT;
430
3
431
3
                // Return the difference minus the number of blocks between tail and head.
432
3
                return tail - head - tail / LAP;
433
0
            }
434
        }
435
3
    }
436
}
437
438
impl<T> Drop for SegQueue<T> {
439
105
    fn drop(&mut self) {
440
105
        let mut head = self.head.index.load(Ordering::Relaxed);
441
105
        let mut tail = self.tail.index.load(Ordering::Relaxed);
442
105
        let mut block = self.head.block.load(Ordering::Relaxed);
443
105
444
105
        // Erase the lower bits.
445
105
        head &= !((1 << SHIFT) - 1);
446
105
        tail &= !((1 << SHIFT) - 1);
447
448
        unsafe {
449
            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
450
52.6k
            while head != tail {
451
52.5k
                let offset = (head >> SHIFT) % LAP;
452
52.5k
453
52.5k
                if offset < BLOCK_CAP {
454
50.8k
                    // Drop the value in the slot.
455
50.8k
                    let slot = (*block).slots.get_unchecked(offset);
456
50.8k
                    let p = &mut *slot.value.get();
457
50.8k
                    p.as_mut_ptr().drop_in_place();
458
50.8k
                } else {
459
1.64k
                    // Deallocate the block and move to the next one.
460
1.64k
                    let next = (*block).next.load(Ordering::Relaxed);
461
1.64k
                    drop(Box::from_raw(block));
462
1.64k
                    block = next;
463
1.64k
                }
464
465
52.5k
                head = head.wrapping_add(1 << SHIFT);
466
            }
467
468
            // Deallocate the last remaining block.
469
105
            if !block.is_null() {
470
105
                drop(Box::from_raw(block));
471
105
            }
0
472
        }
473
105
    }
<crossbeam_queue::seg_queue::SegQueue<()> as core::ops::drop::Drop>::drop
Line
Count
Source
439
1
    fn drop(&mut self) {
440
1
        let mut head = self.head.index.load(Ordering::Relaxed);
441
1
        let mut tail = self.tail.index.load(Ordering::Relaxed);
442
1
        let mut block = self.head.block.load(Ordering::Relaxed);
443
1
444
1
        // Erase the lower bits.
445
1
        head &= !((1 << SHIFT) - 1);
446
1
        tail &= !((1 << SHIFT) - 1);
447
448
        unsafe {
449
            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
450
1
            while head != tail {
451
0
                let offset = (head >> SHIFT) % LAP;
452
0
453
0
                if offset < BLOCK_CAP {
454
0
                    // Drop the value in the slot.
455
0
                    let slot = (*block).slots.get_unchecked(offset);
456
0
                    let p = &mut *slot.value.get();
457
0
                    p.as_mut_ptr().drop_in_place();
458
0
                } else {
459
0
                    // Deallocate the block and move to the next one.
460
0
                    let next = (*block).next.load(Ordering::Relaxed);
461
0
                    drop(Box::from_raw(block));
462
0
                    block = next;
463
0
                }
464
465
0
                head = head.wrapping_add(1 << SHIFT);
466
            }
467
468
            // Deallocate the last remaining block.
469
1
            if !block.is_null() {
470
1
                drop(Box::from_raw(block));
471
1
            }
0
472
        }
473
1
    }
<crossbeam_queue::seg_queue::SegQueue<seg_queue::drops::DropCounter> as core::ops::drop::Drop>::drop
Line
Count
Source
439
100
    fn drop(&mut self) {
440
100
        let mut head = self.head.index.load(Ordering::Relaxed);
441
100
        let mut tail = self.tail.index.load(Ordering::Relaxed);
442
100
        let mut block = self.head.block.load(Ordering::Relaxed);
443
100
444
100
        // Erase the lower bits.
445
100
        head &= !((1 << SHIFT) - 1);
446
100
        tail &= !((1 << SHIFT) - 1);
447
448
        unsafe {
449
            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
450
52.6k
            while head != tail {
451
52.5k
                let offset = (head >> SHIFT) % LAP;
452
52.5k
453
52.5k
                if offset < BLOCK_CAP {
454
50.8k
                    // Drop the value in the slot.
455
50.8k
                    let slot = (*block).slots.get_unchecked(offset);
456
50.8k
                    let p = &mut *slot.value.get();
457
50.8k
                    p.as_mut_ptr().drop_in_place();
458
50.8k
                } else {
459
1.64k
                    // Deallocate the block and move to the next one.
460
1.64k
                    let next = (*block).next.load(Ordering::Relaxed);
461
1.64k
                    drop(Box::from_raw(block));
462
1.64k
                    block = next;
463
1.64k
                }
464
465
52.5k
                head = head.wrapping_add(1 << SHIFT);
466
            }
467
468
            // Deallocate the last remaining block.
469
100
            if !block.is_null() {
470
100
                drop(Box::from_raw(block));
471
100
            }
0
472
        }
473
100
    }
<crossbeam_queue::seg_queue::SegQueue<usize> as core::ops::drop::Drop>::drop
Line
Count
Source
439
3
    fn drop(&mut self) {
440
3
        let mut head = self.head.index.load(Ordering::Relaxed);
441
3
        let mut tail = self.tail.index.load(Ordering::Relaxed);
442
3
        let mut block = self.head.block.load(Ordering::Relaxed);
443
3
444
3
        // Erase the lower bits.
445
3
        head &= !((1 << SHIFT) - 1);
446
3
        tail &= !((1 << SHIFT) - 1);
447
448
        unsafe {
449
            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
450
3
            while head != tail {
451
0
                let offset = (head >> SHIFT) % LAP;
452
0
453
0
                if offset < BLOCK_CAP {
454
0
                    // Drop the value in the slot.
455
0
                    let slot = (*block).slots.get_unchecked(offset);
456
0
                    let p = &mut *slot.value.get();
457
0
                    p.as_mut_ptr().drop_in_place();
458
0
                } else {
459
0
                    // Deallocate the block and move to the next one.
460
0
                    let next = (*block).next.load(Ordering::Relaxed);
461
0
                    drop(Box::from_raw(block));
462
0
                    block = next;
463
0
                }
464
465
0
                head = head.wrapping_add(1 << SHIFT);
466
            }
467
468
            // Deallocate the last remaining block.
469
3
            if !block.is_null() {
470
3
                drop(Box::from_raw(block));
471
3
            }
0
472
        }
473
3
    }
<crossbeam_queue::seg_queue::SegQueue<i32> as core::ops::drop::Drop>::drop
Line
Count
Source
439
1
    fn drop(&mut self) {
440
1
        let mut head = self.head.index.load(Ordering::Relaxed);
441
1
        let mut tail = self.tail.index.load(Ordering::Relaxed);
442
1
        let mut block = self.head.block.load(Ordering::Relaxed);
443
1
444
1
        // Erase the lower bits.
445
1
        head &= !((1 << SHIFT) - 1);
446
1
        tail &= !((1 << SHIFT) - 1);
447
448
        unsafe {
449
            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
450
1
            while head != tail {
451
0
                let offset = (head >> SHIFT) % LAP;
452
0
453
0
                if offset < BLOCK_CAP {
454
0
                    // Drop the value in the slot.
455
0
                    let slot = (*block).slots.get_unchecked(offset);
456
0
                    let p = &mut *slot.value.get();
457
0
                    p.as_mut_ptr().drop_in_place();
458
0
                } else {
459
0
                    // Deallocate the block and move to the next one.
460
0
                    let next = (*block).next.load(Ordering::Relaxed);
461
0
                    drop(Box::from_raw(block));
462
0
                    block = next;
463
0
                }
464
465
0
                head = head.wrapping_add(1 << SHIFT);
466
            }
467
468
            // Deallocate the last remaining block.
469
1
            if !block.is_null() {
470
1
                drop(Box::from_raw(block));
471
1
            }
0
472
        }
473
1
    }
474
}
475
476
impl<T> fmt::Debug for SegQueue<T> {
477
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
478
        f.pad("SegQueue { .. }")
479
    }
480
}
481
482
impl<T> Default for SegQueue<T> {
483
    fn default() -> SegQueue<T> {
484
        SegQueue::new()
485
    }
486
}