Coverage Report

Created: 2021-01-22 16:54

crossbeam-epoch/src/sync/queue.rs
Line
Count
Source (jump to first uncovered line)
1
//! Michael-Scott lock-free queue.
2
//!
3
//! Usable with any number of producers and consumers.
4
//!
5
//! Michael and Scott.  Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
6
//! Algorithms.  PODC 1996.  <http://dl.acm.org/citation.cfm?id=248106>
7
//!
8
//! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a
9
//! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7>
10
11
use core::mem::MaybeUninit;
12
use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
13
14
use crossbeam_utils::CachePadded;
15
16
use crate::{unprotected, Atomic, Guard, Owned, Shared};
17
18
// The representation here is a singly-linked list, with a sentinel node at the front. In general
19
// the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or
20
// all `Blocked` (requests for data from blocked threads).
21
0
#[derive(Debug)]
22
pub(crate) struct Queue<T> {
23
    head: CachePadded<Atomic<Node<T>>>,
24
    tail: CachePadded<Atomic<Node<T>>>,
25
}
26
27
struct Node<T> {
28
    /// The slot in which a value of type `T` can be stored.
29
    ///
30
    /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`.
31
    /// For example, the sentinel node in a queue never contains a value: its slot is always empty.
32
    /// Other nodes start their life with a push operation and contain a value until it gets popped
33
    /// out. After that such empty nodes get added to the collector for destruction.
34
    data: MaybeUninit<T>,
35
36
    next: Atomic<Node<T>>,
37
}
38
39
// Any particular `T` should never be accessed concurrently, so no need for `Sync`.
40
unsafe impl<T: Send> Sync for Queue<T> {}
41
unsafe impl<T: Send> Send for Queue<T> {}
42
43
impl<T> Queue<T> {
44
    /// Create a new, empty queue.
45
70
    pub(crate) fn new() -> Queue<T> {
46
70
        let q = Queue {
47
70
            head: CachePadded::new(Atomic::null()),
48
70
            tail: CachePadded::new(Atomic::null()),
49
70
        };
50
70
        let sentinel = Owned::new(Node {
51
70
            data: MaybeUninit::uninit(),
52
70
            next: Atomic::null(),
53
70
        });
54
70
        unsafe {
55
70
            let guard = unprotected();
56
70
            let sentinel = sentinel.into_shared(guard);
57
70
            q.head.store(sentinel, Relaxed);
58
70
            q.tail.store(sentinel, Relaxed);
59
70
            q
60
70
        }
61
70
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::new
Line
Count
Source
45
43
    pub(crate) fn new() -> Queue<T> {
46
43
        let q = Queue {
47
43
            head: CachePadded::new(Atomic::null()),
48
43
            tail: CachePadded::new(Atomic::null()),
49
43
        };
50
43
        let sentinel = Owned::new(Node {
51
43
            data: MaybeUninit::uninit(),
52
43
            next: Atomic::null(),
53
43
        });
54
43
        unsafe {
55
43
            let guard = unprotected();
56
43
            let sentinel = sentinel.into_shared(guard);
57
43
            q.head.store(sentinel, Relaxed);
58
43
            q.tail.store(sentinel, Relaxed);
59
43
            q
60
43
        }
61
43
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::new
Line
Count
Source
45
16
    pub(crate) fn new() -> Queue<T> {
46
16
        let q = Queue {
47
16
            head: CachePadded::new(Atomic::null()),
48
16
            tail: CachePadded::new(Atomic::null()),
49
16
        };
50
16
        let sentinel = Owned::new(Node {
51
16
            data: MaybeUninit::uninit(),
52
16
            next: Atomic::null(),
53
16
        });
54
16
        unsafe {
55
16
            let guard = unprotected();
56
16
            let sentinel = sentinel.into_shared(guard);
57
16
            q.head.store(sentinel, Relaxed);
58
16
            q.tail.store(sentinel, Relaxed);
59
16
            q
60
16
        }
61
16
    }
<crossbeam_epoch::sync::queue::Queue<i64>>::new
Line
Count
Source
45
10
    pub(crate) fn new() -> Queue<T> {
46
10
        let q = Queue {
47
10
            head: CachePadded::new(Atomic::null()),
48
10
            tail: CachePadded::new(Atomic::null()),
49
10
        };
50
10
        let sentinel = Owned::new(Node {
51
10
            data: MaybeUninit::uninit(),
52
10
            next: Atomic::null(),
53
10
        });
54
10
        unsafe {
55
10
            let guard = unprotected();
56
10
            let sentinel = sentinel.into_shared(guard);
57
10
            q.head.store(sentinel, Relaxed);
58
10
            q.tail.store(sentinel, Relaxed);
59
10
            q
60
10
        }
61
10
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::new
Line
Count
Source
45
1
    pub(crate) fn new() -> Queue<T> {
46
1
        let q = Queue {
47
1
            head: CachePadded::new(Atomic::null()),
48
1
            tail: CachePadded::new(Atomic::null()),
49
1
        };
50
1
        let sentinel = Owned::new(Node {
51
1
            data: MaybeUninit::uninit(),
52
1
            next: Atomic::null(),
53
1
        });
54
1
        unsafe {
55
1
            let guard = unprotected();
56
1
            let sentinel = sentinel.into_shared(guard);
57
1
            q.head.store(sentinel, Relaxed);
58
1
            q.tail.store(sentinel, Relaxed);
59
1
            q
60
1
        }
61
1
    }
62
63
    /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on
64
    /// success. The queue's `tail` pointer may be updated.
65
    #[inline(always)]
66
2.88M
    fn push_internal(
67
2.88M
        &self,
68
2.88M
        onto: Shared<'_, Node<T>>,
69
2.88M
        new: Shared<'_, Node<T>>,
70
2.88M
        guard: &Guard,
71
2.88M
    ) -> bool {
72
2.88M
        // is `onto` the actual tail?
73
2.88M
        let o = unsafe { onto.deref() };
74
2.88M
        let next = o.next.load(Acquire, guard);
75
2.88M
        if unsafe { next.as_ref().is_some() } {
76
            // if not, try to "help" by moving the tail pointer forward
77
331
            let _ = self
78
331
                .tail
79
331
                .compare_exchange(onto, next, Release, Relaxed, guard);
80
331
            false
81
        } else {
82
            // looks like the actual tail; attempt to link in `n`
83
2.88M
            let result = o
84
2.88M
                .next
85
2.88M
                .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
86
2.88M
                .is_ok();
87
2.92M
            if result {
88
2.92M
                // try to move the tail pointer forward
89
2.92M
                let _ = self
90
2.92M
                    .tail
91
2.92M
                    .compare_exchange(onto, new, Release, Relaxed, guard);
92
2.92M
            }
267
93
2.92M
            result
94
        }
95
2.92M
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::push_internal
Line
Count
Source
66
236
    fn push_internal(
67
236
        &self,
68
236
        onto: Shared<'_, Node<T>>,
69
236
        new: Shared<'_, Node<T>>,
70
236
        guard: &Guard,
71
236
    ) -> bool {
72
236
        // is `onto` the actual tail?
73
236
        let o = unsafe { onto.deref() };
74
236
        let next = o.next.load(Acquire, guard);
75
236
        if unsafe { next.as_ref().is_some() } {
76
            // if not, try to "help" by moving the tail pointer forward
77
11
            let _ = self
78
11
                .tail
79
11
                .compare_exchange(onto, next, Release, Relaxed, guard);
80
11
            false
81
        } else {
82
            // looks like the actual tail; attempt to link in `n`
83
225
            let result = o
84
225
                .next
85
225
                .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
86
225
                .is_ok();
87
225
            if result {
88
220
                // try to move the tail pointer forward
89
220
                let _ = self
90
220
                    .tail
91
220
                    .compare_exchange(onto, new, Release, Relaxed, guard);
92
220
            }
6
93
226
            result
94
        }
95
237
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::push_internal
Line
Count
Source
66
4
    fn push_internal(
67
4
        &self,
68
4
        onto: Shared<'_, Node<T>>,
69
4
        new: Shared<'_, Node<T>>,
70
4
        guard: &Guard,
71
4
    ) -> bool {
72
4
        // is `onto` the actual tail?
73
4
        let o = unsafe { onto.deref() };
74
4
        let next = o.next.load(Acquire, guard);
75
4
        if unsafe { next.as_ref().is_some() } {
76
            // if not, try to "help" by moving the tail pointer forward
77
0
            let _ = self
78
0
                .tail
79
0
                .compare_exchange(onto, next, Release, Relaxed, guard);
80
0
            false
81
        } else {
82
            // looks like the actual tail; attempt to link in `n`
83
4
            let result = o
84
4
                .next
85
4
                .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
86
4
                .is_ok();
87
4
            if result {
88
4
                // try to move the tail pointer forward
89
4
                let _ = self
90
4
                    .tail
91
4
                    .compare_exchange(onto, new, Release, Relaxed, guard);
92
4
            }
0
93
4
            result
94
        }
95
4
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::push_internal
Line
Count
Source
66
65.4k
    fn push_internal(
67
65.4k
        &self,
68
65.4k
        onto: Shared<'_, Node<T>>,
69
65.4k
        new: Shared<'_, Node<T>>,
70
65.4k
        guard: &Guard,
71
65.4k
    ) -> bool {
72
65.4k
        // is `onto` the actual tail?
73
65.4k
        let o = unsafe { onto.deref() };
74
65.4k
        let next = o.next.load(Acquire, guard);
75
65.4k
        if unsafe { next.as_ref().is_some() } {
76
            // if not, try to "help" by moving the tail pointer forward
77
320
            let _ = self
78
320
                .tail
79
320
                .compare_exchange(onto, next, Release, Relaxed, guard);
80
320
            false
81
        } else {
82
            // looks like the actual tail; attempt to link in `n`
83
65.1k
            let result = o
84
65.1k
                .next
85
65.1k
                .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
86
65.1k
                .is_ok();
87
65.1k
            if result {
88
64.9k
                // try to move the tail pointer forward
89
64.9k
                let _ = self
90
64.9k
                    .tail
91
64.9k
                    .compare_exchange(onto, new, Release, Relaxed, guard);
92
64.9k
            }
261
93
65.2k
            result
94
        }
95
65.5k
    }
<crossbeam_epoch::sync::queue::Queue<i64>>::push_internal
Line
Count
Source
66
2.81M
    fn push_internal(
67
2.81M
        &self,
68
2.81M
        onto: Shared<'_, Node<T>>,
69
2.81M
        new: Shared<'_, Node<T>>,
70
2.81M
        guard: &Guard,
71
2.81M
    ) -> bool {
72
2.81M
        // is `onto` the actual tail?
73
2.81M
        let o = unsafe { onto.deref() };
74
2.81M
        let next = o.next.load(Acquire, guard);
75
2.81M
        if unsafe { next.as_ref().is_some() } {
76
            // if not, try to "help" by moving the tail pointer forward
77
0
            let _ = self
78
0
                .tail
79
0
                .compare_exchange(onto, next, Release, Relaxed, guard);
80
0
            false
81
        } else {
82
            // looks like the actual tail; attempt to link in `n`
83
2.81M
            let result = o
84
2.81M
                .next
85
2.81M
                .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
86
2.81M
                .is_ok();
87
2.85M
            if result {
88
2.85M
                // try to move the tail pointer forward
89
2.85M
                let _ = self
90
2.85M
                    .tail
91
2.85M
                    .compare_exchange(onto, new, Release, Relaxed, guard);
92
2.85M
            }
0
93
2.85M
            result
94
        }
95
2.85M
    }
96
97
    /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`.
98
2.92M
    pub(crate) fn push(&self, t: T, guard: &Guard) {
99
2.92M
        let new = Owned::new(Node {
100
2.92M
            data: MaybeUninit::new(t),
101
2.92M
            next: Atomic::null(),
102
2.92M
        });
103
2.92M
        let new = Owned::into_shared(new, guard);
104
105
2.92M
        loop {
106
2.92M
            // We push onto the tail, so we'll start optimistically by looking there first.
107
2.92M
            let tail = self.tail.load(Acquire, guard);
108
2.92M
109
2.92M
            // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
110
2.92M
            if self.push_internal(tail, new, guard) {
111
2.92M
                break;
112
598
            }
113
        }
114
2.92M
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::push
Line
Count
Source
98
219
    pub(crate) fn push(&self, t: T, guard: &Guard) {
99
219
        let new = Owned::new(Node {
100
219
            data: MaybeUninit::new(t),
101
219
            next: Atomic::null(),
102
219
        });
103
219
        let new = Owned::into_shared(new, guard);
104
105
236
        loop {
106
236
            // We push onto the tail, so we'll start optimistically by looking there first.
107
236
            let tail = self.tail.load(Acquire, guard);
108
236
109
236
            // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
110
236
            if self.push_internal(tail, new, guard) {
111
219
                break;
112
17
            }
113
        }
114
219
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::push
Line
Count
Source
98
64.9k
    pub(crate) fn push(&self, t: T, guard: &Guard) {
99
64.9k
        let new = Owned::new(Node {
100
64.9k
            data: MaybeUninit::new(t),
101
64.9k
            next: Atomic::null(),
102
64.9k
        });
103
64.9k
        let new = Owned::into_shared(new, guard);
104
105
65.5k
        loop {
106
65.5k
            // We push onto the tail, so we'll start optimistically by looking there first.
107
65.5k
            let tail = self.tail.load(Acquire, guard);
108
65.5k
109
65.5k
            // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
110
65.5k
            if self.push_internal(tail, new, guard) {
111
64.9k
                break;
112
581
            }
113
        }
114
64.9k
    }
<crossbeam_epoch::sync::queue::Queue<i64>>::push
Line
Count
Source
98
2.86M
    pub(crate) fn push(&self, t: T, guard: &Guard) {
99
2.86M
        let new = Owned::new(Node {
100
2.86M
            data: MaybeUninit::new(t),
101
2.86M
            next: Atomic::null(),
102
2.86M
        });
103
2.86M
        let new = Owned::into_shared(new, guard);
104
105
2.86M
        loop {
106
2.86M
            // We push onto the tail, so we'll start optimistically by looking there first.
107
2.86M
            let tail = self.tail.load(Acquire, guard);
108
2.86M
109
2.86M
            // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
110
2.86M
            if self.push_internal(tail, new, guard) {
111
2.86M
                break;
112
0
            }
113
        }
114
2.86M
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::push
Line
Count
Source
98
4
    pub(crate) fn push(&self, t: T, guard: &Guard) {
99
4
        let new = Owned::new(Node {
100
4
            data: MaybeUninit::new(t),
101
4
            next: Atomic::null(),
102
4
        });
103
4
        let new = Owned::into_shared(new, guard);
104
105
4
        loop {
106
4
            // We push onto the tail, so we'll start optimistically by looking there first.
107
4
            let tail = self.tail.load(Acquire, guard);
108
4
109
4
            // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
110
4
            if self.push_internal(tail, new, guard) {
111
4
                break;
112
0
            }
113
        }
114
4
    }
115
116
    /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop.
117
    #[inline(always)]
118
7.24M
    fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
119
7.24M
        let head = self.head.load(Acquire, guard);
120
7.24M
        let h = unsafe { head.deref() };
121
7.24M
        let next = h.next.load(Acquire, guard);
122
7.24M
        match unsafe { next.as_ref() } {
123
3.20M
            Some(n) => unsafe {
124
3.20M
                self.head
125
3.20M
                    .compare_exchange(head, next, Release, Relaxed, guard)
126
3.20M
                    .map(|_| {
127
2.90M
                        let tail = self.tail.load(Relaxed, guard);
128
2.90M
                        // Advance the tail so that we don't retire a pointer to a reachable node.
129
2.90M
                        if head == tail {
130
281
                            let _ = self
131
281
                                .tail
132
281
                                .compare_exchange(tail, next, Release, Relaxed, guard);
133
2.90M
                        }
134
2.90M
                        guard.defer_destroy(head);
135
2.90M
                        // TODO: Replace with MaybeUninit::read when api is stable
136
2.90M
                        Some(n.data.as_ptr().read())
137
3.20M
                    })
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_internal::{closure#0}
Line
Count
Source
126
2
                    .map(|_| {
127
2
                        let tail = self.tail.load(Relaxed, guard);
128
2
                        // Advance the tail so that we don't retire a pointer to a reachable node.
129
2
                        if head == tail {
130
0
                            let _ = self
131
0
                                .tail
132
0
                                .compare_exchange(tail, next, Release, Relaxed, guard);
133
2
                        }
134
2
                        guard.defer_destroy(head);
135
2
                        // TODO: Replace with MaybeUninit::read when api is stable
136
2
                        Some(n.data.as_ptr().read())
137
2
                    })
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::pop_internal::{closure#0}
Line
Count
Source
126
4
                    .map(|_| {
127
4
                        let tail = self.tail.load(Relaxed, guard);
128
4
                        // Advance the tail so that we don't retire a pointer to a reachable node.
129
4
                        if head == tail {
130
0
                            let _ = self
131
0
                                .tail
132
0
                                .compare_exchange(tail, next, Release, Relaxed, guard);
133
4
                        }
134
4
                        guard.defer_destroy(head);
135
4
                        // TODO: Replace with MaybeUninit::read when api is stable
136
4
                        Some(n.data.as_ptr().read())
137
4
                    })
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_internal::{closure#0}
Line
Count
Source
126
221
                    .map(|_| {
127
221
                        let tail = self.tail.load(Relaxed, guard);
128
221
                        // Advance the tail so that we don't retire a pointer to a reachable node.
129
221
                        if head == tail {
130
0
                            let _ = self
131
0
                                .tail
132
0
                                .compare_exchange(tail, next, Release, Relaxed, guard);
133
221
                        }
134
221
                        guard.defer_destroy(head);
135
221
                        // TODO: Replace with MaybeUninit::read when api is stable
136
221
                        Some(n.data.as_ptr().read())
137
221
                    })
<crossbeam_epoch::sync::queue::Queue<i64>>::pop_internal::{closure#0}
Line
Count
Source
126
2.90M
                    .map(|_| {
127
2.90M
                        let tail = self.tail.load(Relaxed, guard);
128
2.90M
                        // Advance the tail so that we don't retire a pointer to a reachable node.
129
2.90M
                        if head == tail {
130
281
                            let _ = self
131
281
                                .tail
132
281
                                .compare_exchange(tail, next, Release, Relaxed, guard);
133
2.90M
                        }
134
2.90M
                        guard.defer_destroy(head);
135
2.90M
                        // TODO: Replace with MaybeUninit::read when api is stable
136
2.90M
                        Some(n.data.as_ptr().read())
137
2.90M
                    })
138
3.20M
                    .map_err(|_| 
()394k
)
139
            },
140
4.04M
            None => Ok(None),
141
        }
142
7.24M
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_internal
Line
Count
Source
118
4
    fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
119
4
        let head = self.head.load(Acquire, guard);
120
4
        let h = unsafe { head.deref() };
121
4
        let next = h.next.load(Acquire, guard);
122
4
        match unsafe { next.as_ref() } {
123
2
            Some(n) => unsafe {
124
2
                self.head
125
2
                    .compare_exchange(head, next, Release, Relaxed, guard)
126
2
                    .map(|_| {
127
                        let tail = self.tail.load(Relaxed, guard);
128
                        // Advance the tail so that we don't retire a pointer to a reachable node.
129
                        if head == tail {
130
                            let _ = self
131
                                .tail
132
                                .compare_exchange(tail, next, Release, Relaxed, guard);
133
                        }
134
                        guard.defer_destroy(head);
135
                        // TODO: Replace with MaybeUninit::read when api is stable
136
                        Some(n.data.as_ptr().read())
137
2
                    })
138
2
                    .map_err(|_| ())
139
            },
140
2
            None => Ok(None),
141
        }
142
4
    }
<crossbeam_epoch::sync::queue::Queue<i64>>::pop_internal
Line
Count
Source
118
5.28M
    fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
119
5.28M
        let head = self.head.load(Acquire, guard);
120
5.28M
        let h = unsafe { head.deref() };
121
5.28M
        let next = h.next.load(Acquire, guard);
122
5.28M
        match unsafe { next.as_ref() } {
123
3.20M
            Some(n) => unsafe {
124
3.20M
                self.head
125
3.20M
                    .compare_exchange(head, next, Release, Relaxed, guard)
126
3.20M
                    .map(|_| {
127
                        let tail = self.tail.load(Relaxed, guard);
128
                        // Advance the tail so that we don't retire a pointer to a reachable node.
129
                        if head == tail {
130
                            let _ = self
131
                                .tail
132
                                .compare_exchange(tail, next, Release, Relaxed, guard);
133
                        }
134
                        guard.defer_destroy(head);
135
                        // TODO: Replace with MaybeUninit::read when api is stable
136
                        Some(n.data.as_ptr().read())
137
3.20M
                    })
138
3.20M
                    .map_err(|_| ())
139
            },
140
2.08M
            None => Ok(None),
141
        }
142
5.28M
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::pop_internal
Line
Count
Source
118
1.96M
    fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
119
1.96M
        let head = self.head.load(Acquire, guard);
120
1.96M
        let h = unsafe { head.deref() };
121
1.96M
        let next = h.next.load(Acquire, guard);
122
1.96M
        match unsafe { next.as_ref() } {
123
4
            Some(n) => unsafe {
124
4
                self.head
125
4
                    .compare_exchange(head, next, Release, Relaxed, guard)
126
4
                    .map(|_| {
127
                        let tail = self.tail.load(Relaxed, guard);
128
                        // Advance the tail so that we don't retire a pointer to a reachable node.
129
                        if head == tail {
130
                            let _ = self
131
                                .tail
132
                                .compare_exchange(tail, next, Release, Relaxed, guard);
133
                        }
134
                        guard.defer_destroy(head);
135
                        // TODO: Replace with MaybeUninit::read when api is stable
136
                        Some(n.data.as_ptr().read())
137
4
                    })
138
4
                    .map_err(|_| ())
139
            },
140
1.96M
            None => Ok(None),
141
        }
142
1.96M
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_internal
Line
Count
Source
118
236
    fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
119
236
        let head = self.head.load(Acquire, guard);
120
236
        let h = unsafe { head.deref() };
121
236
        let next = h.next.load(Acquire, guard);
122
236
        match unsafe { next.as_ref() } {
123
221
            Some(n) => unsafe {
124
221
                self.head
125
221
                    .compare_exchange(head, next, Release, Relaxed, guard)
126
221
                    .map(|_| {
127
                        let tail = self.tail.load(Relaxed, guard);
128
                        // Advance the tail so that we don't retire a pointer to a reachable node.
129
                        if head == tail {
130
                            let _ = self
131
                                .tail
132
                                .compare_exchange(tail, next, Release, Relaxed, guard);
133
                        }
134
                        guard.defer_destroy(head);
135
                        // TODO: Replace with MaybeUninit::read when api is stable
136
                        Some(n.data.as_ptr().read())
137
221
                    })
138
221
                    .map_err(|_| ())
139
            },
140
15
            None => Ok(None),
141
        }
142
236
    }
143
144
    /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue
145
    /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop.
146
    #[inline(always)]
147
3.74M
    fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
148
3.74M
    where
149
3.74M
        T: Sync,
150
3.74M
        F: Fn(&T) -> bool,
151
3.74M
    {
152
3.74M
        let head = self.head.load(Acquire, guard);
153
3.74M
        let h = unsafe { head.deref() };
154
3.74M
        let next = h.next.load(Acquire, guard);
155
3.74M
        match unsafe { next.as_ref() } {
156
162k
            Some(
n66.1k
) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
157
66.1k
                self.head
158
66.1k
                    .compare_exchange(head, next, Release, Relaxed, guard)
159
66.1k
                    .map(|_| {
160
64.8k
                        let tail = self.tail.load(Relaxed, guard);
161
64.8k
                        // Advance the tail so that we don't retire a pointer to a reachable node.
162
64.8k
                        if head == tail {
163
0
                            let _ = self
164
0
                                .tail
165
0
                                .compare_exchange(tail, next, Release, Relaxed, guard);
166
64.8k
                        }
167
64.8k
                        guard.defer_destroy(head);
168
64.8k
                        Some(n.data.as_ptr().read())
169
66.1k
                    })
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_if_internal::<&&<crossbeam_epoch::internal::Global>::collect::{closure#0}>::{closure#0}
Line
Count
Source
159
165
                    .map(|_| {
160
165
                        let tail = self.tail.load(Relaxed, guard);
161
165
                        // Advance the tail so that we don't retire a pointer to a reachable node.
162
165
                        if head == tail {
163
0
                            let _ = self
164
0
                                .tail
165
0
                                .compare_exchange(tail, next, Release, Relaxed, guard);
166
165
                        }
167
165
                        guard.defer_destroy(head);
168
165
                        Some(n.data.as_ptr().read())
169
165
                    })
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_if_internal::<&&<crossbeam_epoch::internal::Global>::collect::{closure#0}>::{closure#0}
Line
Count
Source
159
64.7k
                    .map(|_| {
160
64.7k
                        let tail = self.tail.load(Relaxed, guard);
161
64.7k
                        // Advance the tail so that we don't retire a pointer to a reachable node.
162
64.7k
                        if head == tail {
163
0
                            let _ = self
164
0
                                .tail
165
0
                                .compare_exchange(tail, next, Release, Relaxed, guard);
166
64.7k
                        }
167
64.7k
                        guard.defer_destroy(head);
168
64.7k
                        Some(n.data.as_ptr().read())
169
64.7k
                    })
170
66.1k
                    .map_err(|_| 
()1.37k
)
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_if_internal::<&&<crossbeam_epoch::internal::Global>::collect::{closure#0}>::{closure#1}
Line
Count
Source
170
10
                    .map_err(|_| ())
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_if_internal::<&&<crossbeam_epoch::internal::Global>::collect::{closure#0}>::{closure#1}
Line
Count
Source
170
1.36k
                    .map_err(|_| ())
171
            },
172
3.67M
            None | Some(_) => Ok(None),
173
        }
174
3.74M
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_if_internal::<&&<crossbeam_epoch::internal::Global>::collect::{closure#0}>
Line
Count
Source
147
9.88k
    fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
148
9.88k
    where
149
9.88k
        T: Sync,
150
9.88k
        F: Fn(&T) -> bool,
151
9.88k
    {
152
9.88k
        let head = self.head.load(Acquire, guard);
153
9.88k
        let h = unsafe { head.deref() };
154
9.88k
        let next = h.next.load(Acquire, guard);
155
9.88k
        match unsafe { next.as_ref() } {
156
6.15k
            Some(
n175
) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
157
175
                self.head
158
175
                    .compare_exchange(head, next, Release, Relaxed, guard)
159
175
                    .map(|_| {
160
                        let tail = self.tail.load(Relaxed, guard);
161
                        // Advance the tail so that we don't retire a pointer to a reachable node.
162
                        if head == tail {
163
                            let _ = self
164
                                .tail
165
                                .compare_exchange(tail, next, Release, Relaxed, guard);
166
                        }
167
                        guard.defer_destroy(head);
168
                        Some(n.data.as_ptr().read())
169
175
                    })
170
175
                    .map_err(|_| ())
171
            },
172
9.70k
            None | Some(_) => Ok(None),
173
        }
174
9.87k
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::pop_if_internal::<&&<crossbeam_epoch::internal::Global>::collect::{closure#0}>
Line
Count
Source
147
3.73M
    fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
148
3.73M
    where
149
3.73M
        T: Sync,
150
3.73M
        F: Fn(&T) -> bool,
151
3.73M
    {
152
3.73M
        let head = self.head.load(Acquire, guard);
153
3.73M
        let h = unsafe { head.deref() };
154
3.73M
        let next = h.next.load(Acquire, guard);
155
3.73M
        match unsafe { next.as_ref() } {
156
156k
            Some(
n65.9k
) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
157
65.9k
                self.head
158
65.9k
                    .compare_exchange(head, next, Release, Relaxed, guard)
159
65.9k
                    .map(|_| {
160
                        let tail = self.tail.load(Relaxed, guard);
161
                        // Advance the tail so that we don't retire a pointer to a reachable node.
162
                        if head == tail {
163
                            let _ = self
164
                                .tail
165
                                .compare_exchange(tail, next, Release, Relaxed, guard);
166
                        }
167
                        guard.defer_destroy(head);
168
                        Some(n.data.as_ptr().read())
169
65.9k
                    })
170
65.9k
                    .map_err(|_| ())
171
            },
172
3.66M
            None | Some(_) => Ok(None),
173
        }
174
3.73M
    }
175
176
    /// Attempts to dequeue from the front.
177
    ///
178
    /// Returns `None` if the queue is observed to be empty.
179
6.81M
    pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
180
        loop {
181
7.21M
            if let Ok(
head6.81M
) = self.pop_internal(guard) {
182
6.81M
                return head;
183
394k
            }
184
        }
185
6.81M
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::try_pop
Line
Count
Source
179
4
    pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
180
        loop {
181
4
            if let Ok(head) = self.pop_internal(guard) {
182
4
                return head;
183
0
            }
184
        }
185
4
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::try_pop
Line
Count
Source
179
236
    pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
180
        loop {
181
236
            if let Ok(head) = self.pop_internal(guard) {
182
236
                return head;
183
0
            }
184
        }
185
236
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::try_pop
Line
Count
Source
179
1.95M
    pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
180
        loop {
181
1.95M
            if let Ok(head) = self.pop_internal(guard) {
182
1.95M
                return head;
183
0
            }
184
        }
185
1.95M
    }
<crossbeam_epoch::sync::queue::Queue<i64>>::try_pop
Line
Count
Source
179
4.86M
    pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
180
        loop {
181
5.25M
            if let Ok(
head4.86M
) = self.pop_internal(guard) {
182
4.86M
                return head;
183
394k
            }
184
        }
185
4.86M
    }
186
187
    /// Attempts to dequeue from the front, if the item satisfies the given condition.
188
    ///
189
    /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given
190
    /// condition.
191
3.73M
    pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
192
3.73M
    where
193
3.73M
        T: Sync,
194
3.73M
        F: Fn(&T) -> bool,
195
3.73M
    {
196
        loop {
197
3.73M
            if let Ok(
head3.73M
) = self.pop_if_internal(&condition, guard) {
198
3.73M
                return head;
199
1.37k
            }
200
        }
201
3.73M
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::try_pop_if::<&<crossbeam_epoch::internal::Global>::collect::{closure#0}>
Line
Count
Source
191
9.87k
    pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
192
9.87k
    where
193
9.87k
        T: Sync,
194
9.87k
        F: Fn(&T) -> bool,
195
9.87k
    {
196
        loop {
197
9.88k
            if let Ok(
head9.87k
) = self.pop_if_internal(&condition, guard) {
198
9.87k
                return head;
199
10
            }
200
        }
201
9.87k
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag>>::try_pop_if::<&<crossbeam_epoch::internal::Global>::collect::{closure#0}>
Line
Count
Source
191
3.72M
    pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
192
3.72M
    where
193
3.72M
        T: Sync,
194
3.72M
        F: Fn(&T) -> bool,
195
3.72M
    {
196
        loop {
197
3.72M
            if let Ok(
head3.72M
) = self.pop_if_internal(&condition, guard) {
198
3.72M
                return head;
199
1.36k
            }
200
        }
201
3.72M
    }
202
}
203
204
impl<T> Drop for Queue<T> {
205
28
    fn drop(&mut self) {
206
        unsafe {
207
28
            let guard = unprotected();
208
209
161k
            while self.try_pop(guard).is_some() 
{}161k
210
211
            // Destroy the remaining sentinel node.
212
28
            let sentinel = self.head.load(Relaxed, guard);
213
28
            drop(sentinel.into_owned());
214
28
        }
215
28
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag> as core::ops::drop::Drop>::drop
Line
Count
Source
205
2
    fn drop(&mut self) {
206
        unsafe {
207
2
            let guard = unprotected();
208
209
4
            while self.try_pop(guard).is_some() 
{}2
210
211
            // Destroy the remaining sentinel node.
212
2
            let sentinel = self.head.load(Relaxed, guard);
213
2
            drop(sentinel.into_owned());
214
2
        }
215
2
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::internal::SealedBag> as core::ops::drop::Drop>::drop
Line
Count
Source
205
15
    fn drop(&mut self) {
206
        unsafe {
207
15
            let guard = unprotected();
208
209
236
            while self.try_pop(guard).is_some() 
{}221
210
211
            // Destroy the remaining sentinel node.
212
15
            let sentinel = self.head.load(Relaxed, guard);
213
15
            drop(sentinel.into_owned());
214
15
        }
215
15
    }
<crossbeam_epoch::sync::queue::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR> as core::ops::drop::Drop>::drop
Line
Count
Source
205
1
    fn drop(&mut self) {
206
        unsafe {
207
1
            let guard = unprotected();
208
209
1
            while self.try_pop(guard).is_some() 
{}0
210
211
            // Destroy the remaining sentinel node.
212
1
            let sentinel = self.head.load(Relaxed, guard);
213
1
            drop(sentinel.into_owned());
214
1
        }
215
1
    }
<crossbeam_epoch::sync::queue::Queue<i64> as core::ops::drop::Drop>::drop
Line
Count
Source
205
10
    fn drop(&mut self) {
206
        unsafe {
207
10
            let guard = unprotected();
208
209
161k
            while self.try_pop(guard).is_some() 
{}161k
210
211
            // Destroy the remaining sentinel node.
212
10
            let sentinel = self.head.load(Relaxed, guard);
213
10
            drop(sentinel.into_owned());
214
10
        }
215
10
    }
216
}
217
218
#[cfg(all(test, not(loom_crossbeam)))]
219
mod test {
220
    use super::*;
221
    use crate::pin;
222
    use crossbeam_utils::thread;
223
224
    struct Queue<T> {
225
        queue: super::Queue<T>,
226
    }
227
228
    impl<T> Queue<T> {
229
11
        pub(crate) fn new() -> Queue<T> {
230
11
            Queue {
231
11
                queue: super::Queue::new(),
232
11
            }
233
11
        }
<crossbeam_epoch::sync::queue::test::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::new
Line
Count
Source
229
1
        pub(crate) fn new() -> Queue<T> {
230
1
            Queue {
231
1
                queue: super::Queue::new(),
232
1
            }
233
1
        }
<crossbeam_epoch::sync::queue::test::Queue<i64>>::new
Line
Count
Source
229
10
        pub(crate) fn new() -> Queue<T> {
230
10
            Queue {
231
10
                queue: super::Queue::new(),
232
10
            }
233
10
        }
234
235
2.85M
        pub(crate) fn push(&self, t: T) {
236
2.85M
            let guard = &pin();
237
2.85M
            self.queue.push(t, guard);
238
2.85M
        }
<crossbeam_epoch::sync::queue::test::Queue<i64>>::push
Line
Count
Source
235
2.85M
        pub(crate) fn push(&self, t: T) {
236
2.85M
            let guard = &pin();
237
2.85M
            self.queue.push(t, guard);
238
2.85M
        }
<crossbeam_epoch::sync::queue::test::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::push
Line
Count
Source
235
4
        pub(crate) fn push(&self, t: T) {
236
4
            let guard = &pin();
237
4
            self.queue.push(t, guard);
238
4
        }
239
240
21
        pub(crate) fn is_empty(&self) -> bool {
241
21
            let guard = &pin();
242
21
            let head = self.queue.head.load(Acquire, guard);
243
21
            let h = unsafe { head.deref() };
244
21
            h.next.load(Acquire, guard).is_null()
245
21
        }
<crossbeam_epoch::sync::queue::test::Queue<i64>>::is_empty
Line
Count
Source
240
20
        pub(crate) fn is_empty(&self) -> bool {
241
20
            let guard = &pin();
242
20
            let head = self.queue.head.load(Acquire, guard);
243
20
            let h = unsafe { head.deref() };
244
20
            h.next.load(Acquire, guard).is_null()
245
20
        }
<crossbeam_epoch::sync::queue::test::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::is_empty
Line
Count
Source
240
1
        pub(crate) fn is_empty(&self) -> bool {
241
1
            let guard = &pin();
242
1
            let head = self.queue.head.load(Acquire, guard);
243
1
            let h = unsafe { head.deref() };
244
1
            h.next.load(Acquire, guard).is_null()
245
1
        }
246
247
6.71M
        pub(crate) fn try_pop(&self) -> Option<T> {
248
6.71M
            let guard = &pin();
249
6.71M
            self.queue.try_pop(guard)
250
6.71M
        }
<crossbeam_epoch::sync::queue::test::Queue<i64>>::try_pop
Line
Count
Source
247
4.74M
        pub(crate) fn try_pop(&self) -> Option<T> {
248
4.74M
            let guard = &pin();
249
4.74M
            self.queue.try_pop(guard)
250
4.74M
        }
<crossbeam_epoch::sync::queue::test::Queue<crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::LR>>::try_pop
Line
Count
Source
247
1.96M
        pub(crate) fn try_pop(&self) -> Option<T> {
248
1.96M
            let guard = &pin();
249
1.96M
            self.queue.try_pop(guard)
250
1.96M
        }
251
252
1.00M
        pub(crate) fn pop(&self) -> T {
253
1.01M
            loop {
254
1.01M
                match self.try_pop() {
255
1.01M
                    None => continue,
256
1.00M
                    Some(t) => return t,
257
1.00M
                }
258
1.00M
            }
259
1.00M
        }
260
    }
261
262
    const CONC_COUNT: i64 = 1000000;
263
264
    #[test]
265
1
    fn push_try_pop_1() {
crossbeam_epoch::sync::queue::test::push_try_pop_1::{closure#0}
Line
Count
Source
265
1
    fn push_try_pop_1() {
266
1
        let q: Queue<i64> = Queue::new();
267
1
        assert!(q.is_empty());
268
1
        q.push(37);
269
1
        assert!(!q.is_empty());
270
1
        assert_eq!(q.try_pop(), Some(37));
271
1
        assert!(q.is_empty());
272
1
    }
crossbeam_epoch::sync::queue::test::push_try_pop_1
Line
Count
Source
265
1
    fn push_try_pop_1() {
266
1
        let q: Queue<i64> = Queue::new();
267
1
        assert!(q.is_empty());
268
1
        q.push(37);
269
1
        assert!(!q.is_empty());
270
1
        assert_eq!(q.try_pop(), Some(37));
271
1
        assert!(q.is_empty());
272
1
    }
273
274
    #[test]
275
1
    fn push_try_pop_2() {
crossbeam_epoch::sync::queue::test::push_try_pop_2::{closure#0}
Line
Count
Source
275
1
    fn push_try_pop_2() {
276
1
        let q: Queue<i64> = Queue::new();
277
1
        assert!(q.is_empty());
278
1
        q.push(37);
279
1
        q.push(48);
280
1
        assert_eq!(q.try_pop(), Some(37));
281
1
        assert!(!q.is_empty());
282
1
        assert_eq!(q.try_pop(), Some(48));
283
1
        assert!(q.is_empty());
284
1
    }
crossbeam_epoch::sync::queue::test::push_try_pop_2
Line
Count
Source
275
1
    fn push_try_pop_2() {
276
1
        let q: Queue<i64> = Queue::new();
277
1
        assert!(q.is_empty());
278
1
        q.push(37);
279
1
        q.push(48);
280
1
        assert_eq!(q.try_pop(), Some(37));
281
1
        assert!(!q.is_empty());
282
1
        assert_eq!(q.try_pop(), Some(48));
283
1
        assert!(q.is_empty());
284
1
    }
285
286
    #[test]
287
1
    fn push_try_pop_many_seq() {
crossbeam_epoch::sync::queue::test::push_try_pop_many_seq::{closure#0}
Line
Count
Source
287
1
    fn push_try_pop_many_seq() {
288
1
        let q: Queue<i64> = Queue::new();
289
1
        assert!(q.is_empty());
290
200
        for i in 0..200 {
291
200
            q.push(i)
292
        }
293
1
        assert!(!q.is_empty());
294
200
        for i in 0..200 {
295
200
            assert_eq!(q.try_pop(), Some(i));
296
        }
297
1
        assert!(q.is_empty());
298
1
    }
crossbeam_epoch::sync::queue::test::push_try_pop_many_seq
Line
Count
Source
287
1
    fn push_try_pop_many_seq() {
288
1
        let q: Queue<i64> = Queue::new();
289
1
        assert!(q.is_empty());
290
200
        for i in 0..200 {
291
200
            q.push(i)
292
        }
293
1
        assert!(!q.is_empty());
294
200
        for i in 0..200 {
295
200
            assert_eq!(q.try_pop(), Some(i));
296
        }
297
1
        assert!(q.is_empty());
298
1
    }
299
300
    #[test]
301
1
    fn push_pop_1() {
crossbeam_epoch::sync::queue::test::push_pop_1::{closure#0}
Line
Count
Source
301
1
    fn push_pop_1() {
302
1
        let q: Queue<i64> = Queue::new();
303
1
        assert!(q.is_empty());
304
1
        q.push(37);
305
1
        assert!(!q.is_empty());
306
1
        assert_eq!(q.pop(), 37);
307
1
        assert!(q.is_empty());
308
1
    }
crossbeam_epoch::sync::queue::test::push_pop_1
Line
Count
Source
301
1
    fn push_pop_1() {
302
1
        let q: Queue<i64> = Queue::new();
303
1
        assert!(q.is_empty());
304
1
        q.push(37);
305
1
        assert!(!q.is_empty());
306
1
        assert_eq!(q.pop(), 37);
307
1
        assert!(q.is_empty());
308
1
    }
309
310
    #[test]
311
1
    fn push_pop_2() {
crossbeam_epoch::sync::queue::test::push_pop_2::{closure#0}
Line
Count
Source
311
1
    fn push_pop_2() {
312
1
        let q: Queue<i64> = Queue::new();
313
1
        q.push(37);
314
1
        q.push(48);
315
1
        assert_eq!(q.pop(), 37);
316
1
        assert_eq!(q.pop(), 48);
317
1
    }
crossbeam_epoch::sync::queue::test::push_pop_2
Line
Count
Source
311
1
    fn push_pop_2() {
312
1
        let q: Queue<i64> = Queue::new();
313
1
        q.push(37);
314
1
        q.push(48);
315
1
        assert_eq!(q.pop(), 37);
316
1
        assert_eq!(q.pop(), 48);
317
1
    }
318
319
    #[test]
320
1
    fn push_pop_many_seq() {
crossbeam_epoch::sync::queue::test::push_pop_many_seq::{closure#0}
Line
Count
Source
320
1
    fn push_pop_many_seq() {
321
1
        let q: Queue<i64> = Queue::new();
322
1
        assert!(q.is_empty());
323
200
        for i in 0..200 {
324
200
            q.push(i)
325
        }
326
1
        assert!(!q.is_empty());
327
200
        for i in 0..200 {
328
200
            assert_eq!(q.pop(), i);
329
        }
330
1
        assert!(q.is_empty());
331
1
    }
crossbeam_epoch::sync::queue::test::push_pop_many_seq
Line
Count
Source
320
1
    fn push_pop_many_seq() {
321
1
        let q: Queue<i64> = Queue::new();
322
1
        assert!(q.is_empty());
323
200
        for i in 0..200 {
324
200
            q.push(i)
325
        }
326
1
        assert!(!q.is_empty());
327
200
        for i in 0..200 {
328
200
            assert_eq!(q.pop(), i);
329
        }
330
1
        assert!(q.is_empty());
331
1
    }
332
333
    #[test]
334
1
    fn push_try_pop_many_spsc() {
crossbeam_epoch::sync::queue::test::push_try_pop_many_spsc::{closure#0}
Line
Count
Source
334
1
    fn push_try_pop_many_spsc() {
335
1
        let q: Queue<i64> = Queue::new();
336
1
        assert!(q.is_empty());
337
338
1
        thread::scope(|scope| {
339
1
            scope.spawn(|_| {
340
1
                let mut next = 0;
341
342
1.02M
                while next < CONC_COUNT {
343
1.02M
                    if let Some(
elem1.00M
) = q.try_pop() {
344
0
                        assert_eq!(elem, next);
345
1.00M
                        next += 1;
346
20.4k
                    }
347
                }
348
1
            });
349
350
1.00M
            for i in 0..CONC_COUNT {
351
1.00M
                q.push(i)
352
            }
353
1
        })
354
1
        .unwrap();
355
1
    }
crossbeam_epoch::sync::queue::test::push_try_pop_many_spsc
Line
Count
Source
334
1
    fn push_try_pop_many_spsc() {
335
1
        let q: Queue<i64> = Queue::new();
336
1
        assert!(q.is_empty());
337
338
1
        thread::scope(|scope| {
339
            scope.spawn(|_| {
340
                let mut next = 0;
341
342
                while next < CONC_COUNT {
343
                    if let Some(elem) = q.try_pop() {
344
                        assert_eq!(elem, next);
345
                        next += 1;
346
                    }
347
                }
348
            });
349
350
            for i in 0..CONC_COUNT {
351
                q.push(i)
352
            }
353
1
        })
354
1
        .unwrap();
355
1
    }
356
357
    #[test]
358
1
    fn push_try_pop_many_spmc() {
crossbeam_epoch::sync::queue::test::push_try_pop_many_spmc::{closure#0}
Line
Count
Source
358
1
    fn push_try_pop_many_spmc() {
359
3
        fn recv(_t: i32, q: &Queue<i64>) {
360
3
            let mut cur = -1;
361
2.92M
            for _i in 0..CONC_COUNT {
362
2.92M
                if let Some(
elem854k
) = q.try_pop() {
363
854k
                    assert!(elem > cur);
364
854k
                    cur = elem;
365
854k
366
854k
                    if cur == CONC_COUNT - 1 {
367
0
                        break;
368
854k
                    }
369
2.06M
                }
370
            }
371
3
        }
372
373
1
        let q: Queue<i64> = Queue::new();
374
1
        assert!(q.is_empty());
375
1
        thread::scope(|scope| {
376
3
            for i in 0..3 {
377
3
                let q = &q;
378
3
                scope.spawn(move |_| recv(i, q));
379
3
            }
380
381
1
            scope.spawn(|_| {
382
1.00M
                for i in 0..CONC_COUNT {
383
1.00M
                    q.push(i);
384
1.00M
                }
385
1
            });
386
1
        })
387
1
        .unwrap();
388
1
    }
crossbeam_epoch::sync::queue::test::push_try_pop_many_spmc
Line
Count
Source
358
1
    fn push_try_pop_many_spmc() {
359
        fn recv(_t: i32, q: &Queue<i64>) {
360
            let mut cur = -1;
361
            for _i in 0..CONC_COUNT {
362
                if let Some(elem) = q.try_pop() {
363
                    assert!(elem > cur);
364
                    cur = elem;
365
366
                    if cur == CONC_COUNT - 1 {
367
                        break;
368
                    }
369
                }
370
            }
371
        }
372
373
1
        let q: Queue<i64> = Queue::new();
374
1
        assert!(q.is_empty());
375
1
        thread::scope(|scope| {
376
            for i in 0..3 {
377
                let q = &q;
378
                scope.spawn(move |_| recv(i, q));
379
            }
380
381
            scope.spawn(|_| {
382
                for i in 0..CONC_COUNT {
383
                    q.push(i);
384
                }
385
            });
386
1
        })
387
1
        .unwrap();
388
1
    }
389
390
    #[test]
391
1
    fn push_try_pop_many_mpmc() {
crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc::{closure#0}
Line
Count
Source
391
1
    fn push_try_pop_many_mpmc() {
392
        enum LR {
393
            Left(i64),
394
            Right(i64),
395
        }
396
397
1
        let q: Queue<LR> = Queue::new();
398
1
        assert!(q.is_empty());
399
400
1
        thread::scope(|scope| {
401
2
            for _t in 0..2 {
402
2
                scope.spawn(|_| {
403
2
                    for i in CONC_COUNT - 1..CONC_COUNT {
404
2
                        q.push(LR::Left(i))
405
                    }
406
2
                });
407
2
                scope.spawn(|_| {
408
2
                    for i in CONC_COUNT - 1..CONC_COUNT {
409
2
                        q.push(LR::Right(i))
410
                    }
411
2
                });
412
2
                scope.spawn(|_| {
413
2
                    let mut vl = vec![];
414
2
                    let mut vr = vec![];
415
1.97M
                    for _i in 0..CONC_COUNT {
416
1.97M
                        match q.try_pop() {
417
2
                            Some(LR::Left(x)) => vl.push(x),
418
2
                            Some(LR::Right(x)) => vr.push(x),
419
1.96M
                            _ => {}
420
                        }
421
                    }
422
423
2
                    let mut vl2 = vl.clone();
424
2
                    let mut vr2 = vr.clone();
425
2
                    vl2.sort();
426
2
                    vr2.sort();
427
428
2
                    assert_eq!(vl, vl2);
429
0
                    assert_eq!(vr, vr2);
430
2
                });
431
2
            }
432
1
        })
433
1
        .unwrap();
434
1
    }
crossbeam_epoch::sync::queue::test::push_try_pop_many_mpmc
Line
Count
Source
391
1
    fn push_try_pop_many_mpmc() {
392
        enum LR {
393
            Left(i64),
394
            Right(i64),
395
        }
396
397
1
        let q: Queue<LR> = Queue::new();
398
1
        assert!(q.is_empty());
399
400
1
        thread::scope(|scope| {
401
            for _t in 0..2 {
402
                scope.spawn(|_| {
403
                    for i in CONC_COUNT - 1..CONC_COUNT {
404
                        q.push(LR::Left(i))
405
                    }
406
                });
407
                scope.spawn(|_| {
408
                    for i in CONC_COUNT - 1..CONC_COUNT {
409
                        q.push(LR::Right(i))
410
                    }
411
                });
412
                scope.spawn(|_| {
413
                    let mut vl = vec![];
414
                    let mut vr = vec![];
415
                    for _i in 0..CONC_COUNT {
416
                        match q.try_pop() {
417
                            Some(LR::Left(x)) => vl.push(x),
418
                            Some(LR::Right(x)) => vr.push(x),
419
                            _ => {}
420
                        }
421
                    }
422
423
                    let mut vl2 = vl.clone();
424
                    let mut vr2 = vr.clone();
425
                    vl2.sort();
426
                    vr2.sort();
427
428
                    assert_eq!(vl, vl2);
429
                    assert_eq!(vr, vr2);
430
                });
431
            }
432
1
        })
433
1
        .unwrap();
434
1
    }
435
436
    #[test]
437
1
    fn push_pop_many_spsc() {
438
1
        let q: Queue<i64> = Queue::new();
439
1
440
1
        thread::scope(|scope| {
441
1
            scope.spawn(|_| {
442
1
                let mut next = 0;
443
1.00M
                while next < CONC_COUNT {
444
1.00M
                    assert_eq!(q.pop(), next);
445
1.00M
                    next += 1;
446
                }
447
1
            });
448
449
1.00M
            for i in 0..CONC_COUNT {
450
1.00M
                q.push(i)
451
            }
452
1
        })
453
1
        .unwrap();
454
1
        assert!(q.is_empty());
455
1
    }
456
457
    #[test]
458
1
    fn is_empty_dont_pop() {
crossbeam_epoch::sync::queue::test::is_empty_dont_pop::{closure#0}
Line
Count
Source
458
1
    fn is_empty_dont_pop() {
459
1
        let q: Queue<i64> = Queue::new();
460
1
        q.push(20);
461
1
        q.push(20);
462
1
        assert!(!q.is_empty());
463
1
        assert!(!q.is_empty());
464
1
        assert!(q.try_pop().is_some());
465
1
    }
crossbeam_epoch::sync::queue::test::is_empty_dont_pop
Line
Count
Source
458
1
    fn is_empty_dont_pop() {
459
1
        let q: Queue<i64> = Queue::new();
460
1
        q.push(20);
461
1
        q.push(20);
462
1
        assert!(!q.is_empty());
463
1
        assert!(!q.is_empty());
464
1
        assert!(q.try_pop().is_some());
465
1
    }
466
}